positive technologies application inspector
Результат сканирования
02 декабря 2022
| Проект | tornado |
| Объект сканирования | tornado |
| Адрес сайта | http://localhost |
| Язык программирования | Python |
| Версия | 4.2.0.16614 |
| Время сканирования | 0:04:53 |
| Алгоритм | Искать от точек входа, Искать от доступных public и protected методов |
| Отключенные модули | Blackbox-поиск |
Проверки исходного кода
|
Всего найдено уязвимостей
|
| Подтверждено | 0 |
| Опровергнуто | 0 |
| Не обработано | 359 (0) |
| ----------------------- | |
| Новые | 54 |
| Из предыдущих сканирований | 305 |
| ----------------------- | |
| Исключено | 0 |
Политика безопасности |
Политика безопасности не проверялась |
Распределение уязвимостей
По опасности
| Высокая | 32 |
|
| Средняя | 38 |
|
| Потенциальная | 289 |
|
По типам
| Внедрение SQL-кода | 11 |
|
| Некорректная нейтрализация директив в динамически вычисляемом коде (Внедрение eval) | 2 |
|
| Подделка запроса со стороны сервера | 2 |
|
| Раскрытие путей | 1 |
|
| Удаленное выполнение кода | 16 |
|
| Межсайтовое выполнение сценариев | 3 |
|
| Открытое перенаправление | 30 |
|
| Подделка записи файла журнала | 5 |
|
| JQuery добавление HTML кода в DOM | 8 |
|
| JQuery небезопасная функция для DOM | 4 |
|
| Использование assert выражения | 176 |
|
| Использование urllib | 1 |
|
| Использование жестко закодированного пароля | 47 |
|
| Использование устаревшей функции | 1 |
|
| Небезопасные настройки SSL | 4 |
|
| Потенциальная десериализация произвольных данных | 2 |
|
| Пустой блок обработки исключений | 30 |
|
| Раскрытие информации в статических файлах или константах | 2 |
|
| Статический генератор случайных чисел | 5 |
|
| Уязвимые функции хэширования | 9 |
|
По категориям OWASP Top 10 2021
| A01 - Недостатки контроля доступа | 33 |
|
| A02 - Криптографические недостатки | 14 |
|
| A03 - Внедрение | 44 |
|
| A04 - Небезопасное проектирование | 5 |
|
| A05 - Некорректная настройка параметров безопасности | 30 |
|
| A07 - Недостатки идентификации и аутентификации | 47 |
|
| A08 - Недостатки проверки целостности ПО и данных | 2 |
|
| A09 - Недостатки журналирования и мониторинга | 5 |
|
| A10 - Подделка запроса со стороны сервера (SSRF) | 2 |
|
| Другое | 177 |
|
Найденные уязвимости
Новые (54)
|
|
|
for val, desc in zip(row, cur.description):
obj[desc.name] = val
return obj
async def execute(self,
stmt
, *args):
"""Execute a SQL statement.
Must be called with ``await self.execute(...)``
"""
for val, desc in zip(row, cur.description):
obj[desc.name] = val
return obj
async def execute(self,
stmt
, *args):
"""Execute a SQL statement.
Must be called with ``await self.execute(...)``
"""
Must be called with ``await self.execute(...)``
"""
with (await self.application.db.cursor()) as cur:
await cur.execute(
stmt
, args)
async def query(self, stmt, *args):
"""Query for a list of results.
Typical usage::
Must be called with ``await self.execute(...)``
"""
with (await self.application.db.cursor()) as cur:
await
cur.execute(stmt, args)
async def query(self, stmt, *args):
"""Query for a list of results.
Typical usage::
|
|
|
for val, desc in zip(row, cur.description):
obj[desc.name] = val
return obj
async def execute(self, stmt, *
args
):
"""Execute a SQL statement.
Must be called with ``await self.execute(...)``
"""
for val, desc in zip(row, cur.description):
obj[desc.name] = val
return obj
async def execute(self, stmt, *
args
):
"""Execute a SQL statement.
Must be called with ``await self.execute(...)``
"""
Must be called with ``await self.execute(...)``
"""
with (await self.application.db.cursor()) as cur:
await cur.execute(stmt,
args
)
async def query(self, stmt, *args):
"""Query for a list of results.
Typical usage::
Must be called with ``await self.execute(...)``
"""
with (await self.application.db.cursor()) as cur:
await
cur.execute(stmt, args)
async def query(self, stmt, *args):
"""Query for a list of results.
Typical usage::
|
|
|
"""
with (await self.application.db.cursor()) as cur:
await cur.execute(stmt, args)
async def query(self,
stmt
, *args):
"""Query for a list of results.
Typical usage::
results = await self.query(...)
"""
with (await self.application.db.cursor()) as cur:
await cur.execute(stmt, args)
async def query(self,
stmt
, *args):
"""Query for a list of results.
Typical usage::
results = await self.query(...)
for row in await self.query(...)
"""
with (await self.application.db.cursor()) as cur:
await cur.execute(
stmt
, args)
return [self.row_to_obj(row, cur) for row in await cur.fetchall()]
async def queryone(self, stmt, *args):
"""Query for exactly one result.
for row in await self.query(...)
"""
with (await self.application.db.cursor()) as cur:
await
cur.execute(stmt, args)
return [self.row_to_obj(row, cur) for row in await cur.fetchall()]
async def queryone(self, stmt, *args):
"""Query for exactly one result.
|
|
|
"""
with (await self.application.db.cursor()) as cur:
await cur.execute(stmt, args)
async def query(self, stmt, *
args
):
"""Query for a list of results.
Typical usage::
results = await self.query(...)
"""
with (await self.application.db.cursor()) as cur:
await cur.execute(stmt, args)
async def query(self, stmt, *
args
):
"""Query for a list of results.
Typical usage::
results = await self.query(...)
for row in await self.query(...)
"""
with (await self.application.db.cursor()) as cur:
await cur.execute(stmt,
args
)
return [self.row_to_obj(row, cur) for row in await cur.fetchall()]
async def queryone(self, stmt, *args):
"""Query for exactly one result.
for row in await self.query(...)
"""
with (await self.application.db.cursor()) as cur:
await
cur.execute(stmt, args)
return [self.row_to_obj(row, cur) for row in await cur.fetchall()]
async def queryone(self, stmt, *args):
"""Query for exactly one result.
| Уязвимый код | 58 eval("(" + response + ")") |
|
|
|
jQuery.postJSON = function(url, args, callback) {
args._xsrf = getCookie("_xsrf");
$.ajax({url: url, data: $.param(args), dataType: "text", type: "POST",
success: function(
response
) {
if (callback) callback(eval("(" + response + ")"));
}, error: function(response) {
console.log("ERROR:", response);
jQuery.postJSON = function(url, args, callback) {
args._xsrf = getCookie("_xsrf");
$.ajax({url: url, data: $.param(args), dataType: "text", type: "POST",
success: function(
response
) {
if (callback) callback(eval("(" + response + ")"));
}, error: function(response) {
console.log("ERROR:", response);
args._xsrf = getCookie("_xsrf");
$.ajax({url: url, data: $.param(args), dataType: "text", type: "POST",
success: function(response) {
if (callback) callback(eval("(" +
response
+ ")"));
}, error: function(response) {
console.log("ERROR:", response);
}});
args._xsrf = getCookie("_xsrf");
$.ajax({url: url, data: $.param(args), dataType: "text", type: "POST",
success: function(response) {
if (callback) callback(eval(
"(" + response + ")"
));
}, error: function(response) {
console.log("ERROR:", response);
}});
args._xsrf = getCookie("_xsrf");
$.ajax({url: url, data: $.param(args), dataType: "text", type: "POST",
success: function(response) {
if (callback) callback(
eval("(" + response + ")")
);
}, error: function(response) {
console.log("ERROR:", response);
}});
| Уязвимый код | 102 eval("(" + response + ")") |
|
|
|
data: $.param(args), success: updater.onSuccess,
error: updater.onError});
},
onSuccess: function(
response
) {
try {
updater.newMessages(eval("(" + response + ")"));
} catch (e) {
data: $.param(args), success: updater.onSuccess,
error: updater.onError});
},
onSuccess: function(
response
) {
try {
updater.newMessages(eval("(" + response + ")"));
} catch (e) {
},
onSuccess: function(response) {
try {
updater.newMessages(eval("(" +
response
+ ")"));
} catch (e) {
updater.onError();
return;
},
onSuccess: function(response) {
try {
updater.newMessages(eval(
"(" + response + ")"
));
} catch (e) {
updater.onError();
return;
},
onSuccess: function(response) {
try {
updater.newMessages(
eval("(" + response + ")")
);
} catch (e) {
updater.onError();
return;
| Уязвимый код | 129 client.request(b"GET", utf8(url)) |
|
|
|
fut.add_done_callback(lambda f: self.stop_loop())
runner()
return fut.result()
def twisted_fetch(self,
url
, runner):
# http://twistedmatrix.com/documents/current/web/howto/client.html
chunks = []
client = Agent(self.reactor)
fut.add_done_callback(lambda f: self.stop_loop())
runner()
return fut.result()
def twisted_fetch(self,
url
, runner):
# http://twistedmatrix.com/documents/current/web/howto/client.html
chunks = []
client = Agent(self.reactor)
# http://twistedmatrix.com/documents/current/web/howto/client.html
chunks = []
client = Agent(self.reactor)
d = client.request(b"GET", utf8(
url
))
class Accumulator(Protocol):
def __init__(self, finished):
self.finished = finished
# http://twistedmatrix.com/documents/current/web/howto/client.html
chunks = []
client = Agent(self.reactor)
d = client.request(b"GET",
utf8(url)
)
class Accumulator(Protocol):
def __init__(self, finished):
self.finished = finished
# http://twistedmatrix.com/documents/current/web/howto/client.html
chunks = []
client = Agent(self.reactor)
d =
client.request(b"GET", utf8(url))
class Accumulator(Protocol):
def __init__(self, finished):
self.finished = finished
| Уязвимый код | 174 client.request(b"GET", utf8(url)) |
|
|
|
runner()
self.assertTrue(chunks)
return b"".join(chunks)
def twisted_coroutine_fetch(self,
url
, runner):
body = [None]
@gen.coroutine
def f():
runner()
self.assertTrue(chunks)
return b"".join(chunks)
def twisted_coroutine_fetch(self,
url
, runner):
body = [None]
@gen.coroutine
def f():
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b"GET", utf8(
url
))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b"GET",
utf8(url)
)
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield
client.request(b"GET", utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
|
|
|
return sockets
if hasattr(socket, "AF_UNIX"):
def bind_unix_socket(
file
: str, mode: int = 0o600, backlog: int = _DEFAULT_...
) -> socket.socket:
"""Creates a listening unix socket.
If a socket with the given name already exists, it will be deleted.
return sockets
if hasattr(socket, "AF_UNIX"):
def bind_unix_socket(
file
: str, mode: int = 0o600, backlog: int = _DEFAULT_...
) -> socket.socket:
"""Creates a listening unix socket.
If a socket with the given name already exists, it will be deleted.
else:
raise ValueError("File %s exists and is not a socket", file)
sock.bind(file)
os.chmod(
file
, mode)
sock.listen(backlog)
return sock
def add_accept_handler(
else:
raise ValueError("File %s exists and is not a socket", file)
sock.bind(file)
os.chmod(file, mode)
sock.listen(backlog)
return sock
def add_accept_handler(
|
|
|
except AttributeError:
raise ImportError("No module named %s" % parts[-1])
def exec_in(
code
: Any, glob: Dict[str, Any], loc: Optional[Optiona...
) -> None:
if isinstance(code, str):
# exec(string) inherits the caller's future imports; compile
except AttributeError:
raise ImportError("No module named %s" % parts[-1])
def exec_in(
code
: Any, glob: Dict[str, Any], loc: Optional[Optiona...
) -> None:
if isinstance(code, str):
# exec(string) inherits the caller's future imports; compile
if isinstance(code, str):
# exec(string) inherits the caller's future imports; compile
# the string first to prevent that.
code
= compile(code, "<string>", "exec", dont_inherit=...
exec(code, glob, loc)
def raise_exc_info(
exc_info, # type: Tuple[Optional[type], Optional[BaseException], Optional[TracebackType]]
if isinstance(code, str):
# exec(string) inherits the caller's future imports; compile
# the string first to prevent that.
code =
compile(code, "<string>", "exec", dont_inherit=True)
exec(code, glob, loc)
def raise_exc_info(
exc_info, # type: Tuple[Optional[type], Optional[BaseException], Optional[TracebackType]]
if isinstance(code, str):
# exec(string) inherits the caller's future imports; compile
# the string first to prevent that.
code
= compile(code, "<string>", "exec", dont_inherit=...
exec(code, glob, loc)
def raise_exc_info(
exc_info, # type: Tuple[Optional[type], Optional[BaseException], Optional[TracebackType]]
# exec(string) inherits the caller's future imports; compile
# the string first to prevent that.
code = compile(code, "<string>", "exec", dont_inherit=True)
exec(
code
, glob, loc)
def raise_exc_info(
exc_info, # type: Tuple[Optional[type], Optional[BaseException], Optional[TracebackType]]
):
# exec(string) inherits the caller's future imports; compile
# the string first to prevent that.
code = compile(code, "<string>", "exec", dont_inherit=True)
exec(code, glob, loc)
def raise_exc_info(
exc_info, # type: Tuple[Optional[type], Optional[BaseException], Optional[TracebackType]]
):
|
|
|
except AttributeError:
raise ImportError("No module named %s" % parts[-1])
def exec_in(
code: Any,
glob
: Dict[str, Any], loc: Optional[Optional[Mapping[s...
) -> None:
if isinstance(code, str):
# exec(string) inherits the caller's future imports; compile
except AttributeError:
raise ImportError("No module named %s" % parts[-1])
def exec_in(
code: Any,
glob
: Dict[str, Any], loc: Optional[Optional[Mapping[s...
) -> None:
if isinstance(code, str):
# exec(string) inherits the caller's future imports; compile
# exec(string) inherits the caller's future imports; compile
# the string first to prevent that.
code = compile(code, "<string>", "exec", dont_inherit=True)
exec(code,
glob
, loc)
def raise_exc_info(
exc_info, # type: Tuple[Optional[type], Optional[BaseException], Optional[TracebackType]]
):
# exec(string) inherits the caller's future imports; compile
# the string first to prevent that.
code = compile(code, "<string>", "exec", dont_inherit=True)
exec(code, glob, loc)
def raise_exc_info(
exc_info, # type: Tuple[Optional[type], Optional[BaseException], Optional[TracebackType]]
):
|
|
|
except AttributeError:
raise ImportError("No module named %s" % parts[-1])
def exec_in(
code: Any, glob: Dict[str, Any],
loc
: Optional[Optional[Mapping[str, Any]]] = None
) -> None:
if isinstance(code, str):
# exec(string) inherits the caller's future imports; compile
except AttributeError:
raise ImportError("No module named %s" % parts[-1])
def exec_in(
code: Any, glob: Dict[str, Any],
loc
: Optional[Optional[Mapping[str, Any]]] = None
) -> None:
if isinstance(code, str):
# exec(string) inherits the caller's future imports; compile
# exec(string) inherits the caller's future imports; compile
# the string first to prevent that.
code = compile(code, "<string>", "exec", dont_inherit=True)
exec(code, glob,
loc
)
def raise_exc_info(
exc_info, # type: Tuple[Optional[type], Optional[BaseException], Optional[TracebackType]]
):
# exec(string) inherits the caller's future imports; compile
# the string first to prevent that.
code = compile(code, "<string>", "exec", dont_inherit=True)
exec(code, glob, loc)
def raise_exc_info(
exc_info, # type: Tuple[Optional[type], Optional[BaseException], Optional[TracebackType]]
):
|
|
|
with mock.patch("random.random", mock_random):
self.assertEqual(self.simulate_calls(pc, call_durations), expected)
class TestIOLoopConfiguration(unittest.TestCase):
def run_python(self, *
statements
):
stmt_list = [
"from tornado.ioloop import IOLoop",
"classname = lambda x: x.__class__.__name__",
with mock.patch("random.random", mock_random):
self.assertEqual(self.simulate_calls(pc, call_durations), expected)
class TestIOLoopConfiguration(unittest.TestCase):
def run_python(self, *
statements
):
stmt_list = [
"from tornado.ioloop import IOLoop",
"classname = lambda x: x.__class__.__name__",
stmt_list = [
"from tornado.ioloop import IOLoop",
"classname = lambda x: x.__class__.__name__",
] + list(
statements
)
args = [sys.executable, "-c", "; ".join(stmt_list)]
return native_str(subprocess.check_output(args)).strip()
def test_default(self):
stmt_list = [
"from tornado.ioloop import IOLoop",
"classname = lambda x: x.__class__.__name__",
] +
list(statements)
args = [sys.executable, "-c", "; ".join(stmt_list)]
return native_str(subprocess.check_output(args)).strip()
def test_default(self):
self.assertEqual(self.simulate_calls(pc, call_durations), expected)
class TestIOLoopConfiguration(unittest.TestCase):
def run_python(self, *statements):
stmt_list = [
"from tornado.ioloop import IOLoop",
"classname = lambda x: x.__class__.__name__",
] + list(statements)
self.assertEqual(self.simulate_calls(pc, call_durations), expected)
class TestIOLoopConfiguration(unittest.TestCase):
def run_python(self, *statements):
stmt_list
= [
"from tornado.ioloop import IOLoop",
"classname = lambda x: x.__class__.__name__",
] + list(statements)
"from tornado.ioloop import IOLoop",
"classname = lambda x: x.__class__.__name__",
] + list(statements)
args = [sys.executable, "-c", "; ".join(
stmt_list
)]
return native_str(subprocess.check_output(args)).strip()
def test_default(self):
# When asyncio is available, it is used by default.
"from tornado.ioloop import IOLoop",
"classname = lambda x: x.__class__.__name__",
] + list(statements)
args = [sys.executable, "-c", "
; ".join(stmt_list)
]
return native_str(subprocess.check_output(args)).strip()
def test_default(self):
# When asyncio is available, it is used by default.
"from tornado.ioloop import IOLoop",
"classname = lambda x: x.__class__.__name__",
] + list(statements)
args =
[sys.executable, "-c", "; ".join(stmt_list)]
return native_str(subprocess.check_output(args)).strip()
def test_default(self):
# When asyncio is available, it is used by default.
"from tornado.ioloop import IOLoop",
"classname = lambda x: x.__class__.__name__",
] + list(statements)
args
= [sys.executable, "-c", "; ".join(stmt_list)]
return native_str(subprocess.check_output(args)).strip()
def test_default(self):
# When asyncio is available, it is used by default.
"classname = lambda x: x.__class__.__name__",
] + list(statements)
args = [sys.executable, "-c", "; ".join(stmt_list)]
return native_str(subprocess.check_output(
args
)).strip()
def test_default(self):
# When asyncio is available, it is used by default.
cls = self.run_python("print(classname(IOLoop.current()))")
"classname = lambda x: x.__class__.__name__",
] + list(statements)
args = [sys.executable, "-c", "; ".join(stmt_list)]
return native_str(
subprocess.check_output(args)
).strip()
def test_default(self):
# When asyncio is available, it is used by default.
cls = self.run_python("print(classname(IOLoop.current()))")
| Уязвимый код | 208 subprocess.Popen( [sys.executable, "-c", program] + (args or []), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) |
|
|
|
handler.close()
class LoggingOptionTest(unittest.TestCase):
"""Test the ability to enable and disable Tornado's logging hooks."""
def logs_present(self,
statement
, args=None):
# Each test may manipulate and/or parse the options and then logs
# a line at the 'info' level. This level is ignored in the
# logging module by default, but Tornado turns it on by default
handler.close()
class LoggingOptionTest(unittest.TestCase):
"""Test the ability to enable and disable Tornado's logging hooks."""
def logs_present(self,
statement
, args=None):
# Each test may manipulate and/or parse the options and then logs
# a line at the 'info' level. This level is ignored in the
# logging module by default, but Tornado turns it on by default
# ran.
IMPORT = "from tornado.options import options, parse_command_line"
LOG_INFO = 'import logging; logging.info("hello")'
program = ";".join([IMPORT,
statement
, LOG_INFO])
proc = subprocess.Popen(
[sys.executable, "-c", program] + (args or []),
stdout=subprocess.PIPE,
# ran.
IMPORT = "from tornado.options import options, parse_command_line"
LOG_INFO = 'import logging; logging.info("hello")'
program = ";".join(
[IMPORT, statement, LOG_INFO]
)
proc = subprocess.Popen(
[sys.executable, "-c", program] + (args or []),
stdout=subprocess.PIPE,
# ran.
IMPORT = "from tornado.options import options, parse_command_line"
LOG_INFO = 'import logging; logging.info("hello")'
program = "
;".join([IMPORT, statement, LOG_INFO])
proc = subprocess.Popen(
[sys.executable, "-c", program] + (args or []),
stdout=subprocess.PIPE,
# ran.
IMPORT = "from tornado.options import options, parse_command_line"
LOG_INFO = 'import logging; logging.info("hello")'
program
= ";".join([IMPORT, statement, LOG_INFO])
proc = subprocess.Popen(
[sys.executable, "-c", program] + (args or []),
stdout=subprocess.PIPE,
LOG_INFO = 'import logging; logging.info("hello")'
program = ";".join([IMPORT, statement, LOG_INFO])
proc = subprocess.Popen(
[sys.executable, "-c",
program
] + (args or []),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
LOG_INFO = 'import logging; logging.info("hello")'
program = ";".join([IMPORT, statement, LOG_INFO])
proc = subprocess.Popen(
[sys.executable, "-c", program]
+ (args or []),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
LOG_INFO = 'import logging; logging.info("hello")'
program = ";".join([IMPORT, statement, LOG_INFO])
proc = subprocess.Popen(
[sys.executable, "-c", program] + (args or []
),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
IMPORT = "from tornado.options import options, parse_command_line"
LOG_INFO = 'import logging; logging.info("hello")'
program = ";".join([IMPORT, statement, LOG_INFO])
proc = subprocess.Popen(
[sys.executable, "-c", program] + (args or []),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
|
|
|
handler.close()
class LoggingOptionTest(unittest.TestCase):
"""Test the ability to enable and disable Tornado's logging hooks."""
def logs_present(self, statement,
args
=None):
# Each test may manipulate and/or parse the options and then logs
# a line at the 'info' level. This level is ignored in the
# logging module by default, but Tornado turns it on by default
handler.close()
class LoggingOptionTest(unittest.TestCase):
"""Test the ability to enable and disable Tornado's logging hooks."""
def logs_present(self, statement,
args
=None):
# Each test may manipulate and/or parse the options and then logs
# a line at the 'info' level. This level is ignored in the
# logging module by default, but Tornado turns it on by default
LOG_INFO = 'import logging; logging.info("hello")'
program = ";".join([IMPORT, statement, LOG_INFO])
proc = subprocess.Popen(
[sys.executable, "-c", program] + (
args
or []),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
LOG_INFO = 'import logging; logging.info("hello")'
program = ";".join([IMPORT, statement, LOG_INFO])
proc = subprocess.Popen(
[sys.executable, "-c", program] + (args or []
),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
IMPORT = "from tornado.options import options, parse_command_line"
LOG_INFO = 'import logging; logging.info("hello")'
program = ";".join([IMPORT, statement, LOG_INFO])
proc = subprocess.Popen(
[sys.executable, "-c", program] + (args or []),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
|
|
|
# processes, each of which prints its task id to stdout (a single
# byte, so we don't have to worry about atomicity of the shared
# stdout stream) and then exits.
def run_subproc(self,
code
):
proc = subprocess.Popen(
sys.executable, stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
# processes, each of which prints its task id to stdout (a single
# byte, so we don't have to worry about atomicity of the shared
# stdout stream) and then exits.
def run_subproc(self,
code
):
proc = subprocess.Popen(
sys.executable, stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
proc = subprocess.Popen(
sys.executable, stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
proc.stdin.write(utf8(
code
))
proc.stdin.close()
proc.wait()
stdout = proc.stdout.read()
proc = subprocess.Popen(
sys.executable, stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
proc.stdin.write(
utf8(code)
)
proc.stdin.close()
proc.wait()
stdout = proc.stdout.read()
proc = subprocess.Popen(
sys.executable, stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
proc.stdin.write(utf8(code))
proc.stdin.close()
proc.wait()
stdout = proc.stdout.read()
| Уязвимый код | 93 exec(textwrap.dedent(s), global_namespace, local_namespace) |
|
|
|
conn.close()
server_socket.close()
return (client_socket.close, client_addr[1])
def exec_test(
caller_globals
, caller_locals, s):
"""Execute ``s`` in a given context and return the result namespace.
Used to define functions for tests in particular python
versions that would be syntax errors in older versions.
conn.close()
server_socket.close()
return (client_socket.close, client_addr[1])
def exec_test(
caller_globals
, caller_locals, s):
"""Execute ``s`` in a given context and return the result namespace.
Used to define functions for tests in particular python
versions that would be syntax errors in older versions.
# Flatten the real global and local namespace into our fake
# globals: it's all global from the perspective of code defined
# in s.
global_namespace = dict(
caller_globals
, **caller_locals) # type: ignore
local_namespace = {} # type: typing.Dict[str, typing.Any]
exec(textwrap.dedent(s), global_namespace, local_namespace)
return local_namespace
# Flatten the real global and local namespace into our fake
# globals: it's all global from the perspective of code defined
# in s.
global_namespace =
dict(caller_globals, **caller_locals)
# type: ignore
local_namespace = {} # type: typing.Dict[str, typing.Any]
exec(textwrap.dedent(s), global_namespace, local_namespace)
return local_namespace
# Flatten the real global and local namespace into our fake
# globals: it's all global from the perspective of code defined
# in s.
global_namespace
= dict(caller_globals, **caller_locals) # type: ...
local_namespace = {} # type: typing.Dict[str, typing.Any]
exec(textwrap.dedent(s), global_namespace, local_namespace)
return local_namespace
# in s.
global_namespace = dict(caller_globals, **caller_locals) # type: ignore
local_namespace = {} # type: typing.Dict[str, typing.Any]
exec(textwrap.dedent(s),
global_namespace
, local_namespace)
return local_namespace
def subTest(test, *args, **kwargs):
"""Compatibility shim for unittest.TestCase.subTest.
# in s.
global_namespace = dict(caller_globals, **caller_locals) # type: ignore
local_namespace = {} # type: typing.Dict[str, typing.Any]
exec(textwrap.dedent(s), global_namespace, local_namespace)
return local_namespace
def subTest(test, *args, **kwargs):
"""Compatibility shim for unittest.TestCase.subTest.
|
|
|
conn.close()
server_socket.close()
return (client_socket.close, client_addr[1])
def exec_test(caller_globals,
caller_locals
, s):
"""Execute ``s`` in a given context and return the result namespace.
Used to define functions for tests in particular python
versions that would be syntax errors in older versions.
conn.close()
server_socket.close()
return (client_socket.close, client_addr[1])
def exec_test(caller_globals,
caller_locals
, s):
"""Execute ``s`` in a given context and return the result namespace.
Used to define functions for tests in particular python
versions that would be syntax errors in older versions.
# Flatten the real global and local namespace into our fake
# globals: it's all global from the perspective of code defined
# in s.
global_namespace = dict(caller_globals, **
caller_locals
) # type: ignore
local_namespace = {} # type: typing.Dict[str, typing.Any]
exec(textwrap.dedent(s), global_namespace, local_namespace)
return local_namespace
# Flatten the real global and local namespace into our fake
# globals: it's all global from the perspective of code defined
# in s.
global_namespace = dict(caller_globals,
**caller_locals
) # type: ignore
local_namespace = {} # type: typing.Dict[str, typing.Any]
exec(textwrap.dedent(s), global_namespace, local_namespace)
return local_namespace
# Flatten the real global and local namespace into our fake
# globals: it's all global from the perspective of code defined
# in s.
global_namespace =
dict(caller_globals, **caller_locals)
# type: ignore
local_namespace = {} # type: typing.Dict[str, typing.Any]
exec(textwrap.dedent(s), global_namespace, local_namespace)
return local_namespace
# Flatten the real global and local namespace into our fake
# globals: it's all global from the perspective of code defined
# in s.
global_namespace
= dict(caller_globals, **caller_locals) # type: ...
local_namespace = {} # type: typing.Dict[str, typing.Any]
exec(textwrap.dedent(s), global_namespace, local_namespace)
return local_namespace
# in s.
global_namespace = dict(caller_globals, **caller_locals) # type: ignore
local_namespace = {} # type: typing.Dict[str, typing.Any]
exec(textwrap.dedent(s),
global_namespace
, local_namespace)
return local_namespace
def subTest(test, *args, **kwargs):
"""Compatibility shim for unittest.TestCase.subTest.
# in s.
global_namespace = dict(caller_globals, **caller_locals) # type: ignore
local_namespace = {} # type: typing.Dict[str, typing.Any]
exec(textwrap.dedent(s), global_namespace, local_namespace)
return local_namespace
def subTest(test, *args, **kwargs):
"""Compatibility shim for unittest.TestCase.subTest.
|
|
|
conn.close()
server_socket.close()
return (client_socket.close, client_addr[1])
def exec_te
s
t(caller_globals, caller_locals, s):
"""Execute ``s`` in a given context and return the result namespace.
Used to define functions for tests in particular python
versions that would be syntax errors in older versions.
conn.close()
server_socket.close()
return (client_socket.close, client_addr[1])
def exec_te
s
t(caller_globals, caller_locals, s):
"""Execute ``s`` in a given context and return the result namespace.
Used to define functions for tests in particular python
versions that would be syntax errors in older versions.
# in s.
global_namespace = dict(caller_globals, **caller_locals) # type: ignore
local_namespace = {} # type: typing.Dict[str, typing.Any]
exec(textwrap.dedent(
s
), global_namespace, local_namespace)
return local_namespace
def subTest(test, *args, **kwargs):
"""Compatibility shim for unittest.TestCase.subTest.
# in s.
global_namespace = dict(caller_globals, **caller_locals) # type: ignore
local_namespace = {} # type: typing.Dict[str, typing.Any]
exec(
textwrap.dedent(s)
, global_namespace, local_namespace)
return local_namespace
def subTest(test, *args, **kwargs):
"""Compatibility shim for unittest.TestCase.subTest.
# in s.
global_namespace = dict(caller_globals, **caller_locals) # type: ignore
local_namespace = {} # type: typing.Dict[str, typing.Any]
exec(textwrap.dedent(s), global_namespace, local_namespace)
return local_namespace
def subTest(test, *args, **kwargs):
"""Compatibility shim for unittest.TestCase.subTest.
| Уязвимый код | 156 self.render("entry.html", entry=entry) |
|
|
|
return
self.render("home.html", entries=entries)
class EntryHandler(BaseHandler):
async def get(self,
slug
):
entry = await self.queryone("SELECT * FROM entries WHERE slug = %s", slug)
if not entry:
raise tornado.web.HTTPError(404)
return
self.render("home.html", entries=entries)
class EntryHandler(BaseHandler):
async def get(self,
slug
):
entry = await self.queryone("SELECT * FROM entries WHERE slug = %s", slug)
if not entry:
raise tornado.web.HTTPError(404)
self.render("home.html", entries=entries)
class EntryHandler(BaseHandler):
async def get(self, slug):
... await self.queryone("SELECT * FROM entries WHERE
slug
= %s", slug)
if not entry:
raise tornado.web.HTTPError(404)
self.render("entry.html", entry=entry)
self.render("home.html", entries=entries)
class EntryHandler(BaseHandler):
async def get(self, slug):
entry = await
self.queryone("SELECT * FROM entries WHERE slug = %s", slug)
if not entry:
raise tornado.web.HTTPError(404)
self.render("entry.html", entry=entry)
self.render("home.html", entries=entries)
class EntryHandler(BaseHandler):
async def get(self, slug):
entry =
await self.queryone("SELECT * FROM entries WHERE slug = %s", slug)
if not entry:
raise tornado.web.HTTPError(404)
self.render("entry.html", entry=entry)
self.render("home.html", entries=entries)
class EntryHandler(BaseHandler):
async def get(self, slug):
entry
= await self.queryone("SELECT * FROM entries WHER...
if not entry:
raise tornado.web.HTTPError(404)
self.render("entry.html", entry=entry)
entry = await self.queryone("SELECT * FROM entries WHERE slug = %s", slug)
if not entry:
raise tornado.web.HTTPError(404)
self.render("
entry
.html", entry=entry)
class ArchiveHandler(BaseHandler):
async def get(self):
entries = await self.query("SELECT * FROM entries ORDER BY published DESC")
entry = await self.queryone("SELECT * FROM entries WHERE slug = %s", slug)
if not entry:
raise tornado.web.HTTPError(404)
self.render("entry.html", entry=entry)
class ArchiveHandler(BaseHandler):
async def get(self):
entries = await self.query("SELECT * FROM entries ORDER BY published DESC")
| Уязвимый код | 1798 self._active_modules[name].render(*args, **kwargs) |
|
|
|
exc_info=(typ, value, tb),
)
def _ui_module(self, name: str, module: Type["UIModule"]) -> Callable[..., str]:
def render(*
args
, **kwargs) -> str: # type: ignore
if not hasattr(self, "_active_modules"):
self._active_modules = {} # type: Dict[str, UIModule]
if name not in self._active_modules:
exc_info=(typ, value, tb),
)
def _ui_module(self, name: str, module: Type["UIModule"]) -> Callable[..., str]:
def render(*
args
, **kwargs) -> str: # type: ignore
if not hasattr(self, "_active_modules"):
self._active_modules = {} # type: Dict[str, UIModule]
if name not in self._active_modules:
self._active_modules = {} # type: Dict[str, UIModule]
if name not in self._active_modules:
self._active_modules[name] = module(self)
... rendered = self._active_modules[name].render(*
args
, **kwargs)
return rendered
return render
def _ui_method(self, method: Callable[..., str]) -> Callable[..., str]:
self._active_modules = {} # type: Dict[str, UIModule]
if name not in self._active_modules:
self._active_modules[name] = module(self)
... rendered = self._active_modules[name].render(
*args
, **kwargs)
return rendered
return render
def _ui_method(self, method: Callable[..., str]) -> Callable[..., str]:
self._active_modules = {} # type: Dict[str, UIModule]
if name not in self._active_modules:
self._active_modules[name] = module(self)
rendered =
self._active_modules[name].render(*args, **kwargs)
return rendered
return render
def _ui_method(self, method: Callable[..., str]) -> Callable[..., str]:
|
|
|
exc_info=(typ, value, tb),
)
def _ui_module(self, name: str, module: Type["UIModule"]) -> Callable[..., str]:
def render(*args, **
kwargs
) -> str: # type: ignore
if not hasattr(self, "_active_modules"):
self._active_modules = {} # type: Dict[str, UIModule]
if name not in self._active_modules:
exc_info=(typ, value, tb),
)
def _ui_module(self, name: str, module: Type["UIModule"]) -> Callable[..., str]:
def render(*args, **
kwargs
) -> str: # type: ignore
if not hasattr(self, "_active_modules"):
self._active_modules = {} # type: Dict[str, UIModule]
if name not in self._active_modules:
self._active_modules = {} # type: Dict[str, UIModule]
if name not in self._active_modules:
self._active_modules[name] = module(self)
...ered = self._active_modules[name].render(*args, **
kwargs
)
return rendered
return render
def _ui_method(self, method: Callable[..., str]) -> Callable[..., str]:
self._active_modules = {} # type: Dict[str, UIModule]
if name not in self._active_modules:
self._active_modules[name] = module(self)
...ndered = self._active_modules[name].render(*args,
**kwargs
)
return rendered
return render
def _ui_method(self, method: Callable[..., str]) -> Callable[..., str]:
self._active_modules = {} # type: Dict[str, UIModule]
if name not in self._active_modules:
self._active_modules[name] = module(self)
rendered =
self._active_modules[name].render(*args, **kwargs)
return rendered
return render
def _ui_method(self, method: Callable[..., str]) -> Callable[..., str]:
| Уязвимый код | 114 handler.redirect(endpoint + "?" + urllib.parse.urlencode(args)) |
|
|
|
"""
def authenticate_redirect(
self,
callback_uri
: Optional[str] = None,
ax_attrs: List[str] = ["name", "email", "language", "username"],
) -> None:
"""Redirects to the authentication URL for this service.
"""
def authenticate_redirect(
self,
callback_uri
: Optional[str] = None,
ax_attrs: List[str] = ["name", "email", "language", "username"],
) -> None:
"""Redirects to the authentication URL for this service.
synchronous function.
"""
handler = cast(RequestHandler, self)
callback_uri
= callback_uri or handler.request.uri
assert callback_uri is not None
args = self._openid_args(callback_uri, ax_attrs=ax_attrs)
endpoint = self._OPENID_ENDPOINT # type: ignore
synchronous function.
"""
handler = cast(RequestHandler, self)
callback_uri =
callback_uri or handler.request.uri
assert callback_uri is not None
args = self._openid_args(callback_uri, ax_attrs=ax_attrs)
endpoint = self._OPENID_ENDPOINT # type: ignore
synchronous function.
"""
handler = cast(RequestHandler, self)
callback_uri
= callback_uri or handler.request.uri
assert callback_uri is not None
args = self._openid_args(callback_uri, ax_attrs=ax_attrs)
endpoint = self._OPENID_ENDPOINT # type: ignore
handler = cast(RequestHandler, self)
callback_uri = callback_uri or handler.request.uri
assert callback_uri is not None
args = self._openid_args(
callback_uri
, ax_attrs=ax_attrs)
endpoint = self._OPENID_ENDPOINT # type: ignore
handler.redirect(endpoint + "?" + urllib.parse.urlencode(args))
async def get_authenticated_user(
)
return self._on_authentication_verified(resp)
def _openid_args(
self
,
callback_uri: str,
ax_attrs: Iterable[str] = [],
oauth_scope: Optional[str] = None,
ax_attrs: Iterable[str] = [],
oauth_scope: Optional[str] = None,
) -> Dict[str, str]:
handler = cast(RequestHandler,
self
)
url = urllib.parse.urljoin(handler.request.full_url(), callback_uri)
args = {
"openid.ns": "http://specs.openid.net/auth/2.0",
ax_attrs: Iterable[str] = [],
oauth_scope: Optional[str] = None,
) -> Dict[str, str]:
handler =
cast(RequestHandler, self)
url = urllib.parse.urljoin(handler.request.full_url(), callback_uri)
args = {
"openid.ns": "http://specs.openid.net/auth/2.0",
ax_attrs: Iterable[str] = [],
oauth_scope: Optional[str] = None,
) -> Dict[str, str]:
handler
= cast(RequestHandler, self)
url = urllib.parse.urljoin(handler.request.full_url(), callback_uri)
args = {
"openid.ns": "http://specs.openid.net/auth/2.0",
oauth_scope: Optional[str] = None,
) -> Dict[str, str]:
handler = cast(RequestHandler, self)
url = urllib.parse.urljoin(
handler.request.full_url
(), callback_uri)
args = {
"openid.ns": "http://specs.openid.net/auth/2.0",
"openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select",
oauth_scope: Optional[str] = None,
) -> Dict[str, str]:
handler = cast(RequestHandler, self)
url = urllib.parse.urljoin(
handler.request.full_url()
, callback_uri)
args = {
"openid.ns": "http://specs.openid.net/auth/2.0",
"openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select",
oauth_scope: Optional[str] = None,
) -> Dict[str, str]:
handler = cast(RequestHandler, self)
url =
urllib.parse.urljoin(handler.request.full_url(), callback_uri)
args = {
"openid.ns": "http://specs.openid.net/auth/2.0",
"openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select",
oauth_scope: Optional[str] = None,
) -> Dict[str, str]:
handler = cast(RequestHandler, self)
url
= urllib.parse.urljoin(handler.request.full_url()...
args = {
"openid.ns": "http://specs.openid.net/auth/2.0",
"openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.ns": "http://specs.openid.net/auth/2.0",
"openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.identity": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.return_to":
url
,
"openid.realm": urllib.parse.urljoin(url, "/"),
"openid.mode": "checkid_setup",
}
"openid.ns": "http://specs.openid.net/auth/2.0",
"openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.identity": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.return_to"
: url,
"openid.realm": urllib.parse.urljoin(url, "/"),
"openid.mode": "checkid_setup",
}
) -> Dict[str, str]:
handler = cast(RequestHandler, self)
url = urllib.parse.urljoin(handler.request.full_url(), callback_uri)
args = {
"openid.ns": "http://specs.openid.net/auth/2.0",
"openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.identity": "http://specs.openid.net/auth/2.0/identifier_select",
) -> Dict[str, str]:
handler = cast(RequestHandler, self)
url = urllib.parse.urljoin(handler.request.full_url(), callback_uri)
args
= {
"openid.ns": "http://specs.openid.net/auth/2.0",
"openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.identity": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.mode": "checkid_setup",
}
if ax_attrs:
args.update
(
{
"openid.ns.ax": "http://openid.net/srv/ax/1.0",
"openid.ax.mode": "fetch_request",
"openid.mode": "checkid_setup",
}
if ax_attrs:
args.update(
{
"openid.ns.ax": "http://openid.net/srv/ax/1.0",
"openid.ax.mode": "fetch_request",
if "name" in ax_attrs:
ax_attrs -= set(["name", "firstname", "fullname", "lastname"])
required += ["firstname", "fullname", "lastname"]
args.update
(
{
"openid.ax.type.firstname": "http://axschema.org/namePerson/first",
"openid.ax.type.fullname": "http://axschema.org/namePerson",
if "name" in ax_attrs:
ax_attrs -= set(["name", "firstname", "fullname", "lastname"])
required += ["firstname", "fullname", "lastname"]
args.update(
{
"openid.ax.type.firstname": "http://axschema.org/namePerson/first",
"openid.ax.type.fullname": "http://axschema.org/namePerson",
"username": "http://axschema.org/namePerson/friendly",
}
for name in ax_attrs:
args
["openid.ax.type." + name] = known_attrs[name]
required.append(name)
args["openid.ax.required"] = ",".join(required)
if oauth_scope:
"username": "http://axschema.org/namePerson/friendly",
}
for name in ax_attrs:
args["openid.ax.type." + name]
= known_attrs[name]
required.append(name)
args["openid.ax.required"] = ",".join(required)
if oauth_scope:
for name in ax_attrs:
args["openid.ax.type." + name] = known_attrs[name]
required.append(name)
args
["openid.ax.required"] = ",".join(required)
if oauth_scope:
args.update(
{
for name in ax_attrs:
args["openid.ax.type." + name] = known_attrs[name]
required.append(name)
args["openid.ax.required"]
= ",".join(required)
if oauth_scope:
args.update(
{
"openid.oauth.scope": oauth_scope,
}
)
return
args
def _on_authentication_verified(
self, response: httpclient.HTTPResponse
) -> Dict[str, Any]:
handler = cast(RequestHandler, self)
callback_uri = callback_uri or handler.request.uri
assert callback_uri is not None
args
= self._openid_args(callback_uri, ax_attrs=ax_att...
endpoint = self._OPENID_ENDPOINT # type: ignore
handler.redirect(endpoint + "?" + urllib.parse.urlencode(args))
async def get_authenticated_user(
assert callback_uri is not None
args = self._openid_args(callback_uri, ax_attrs=ax_attrs)
endpoint = self._OPENID_ENDPOINT # type: ignore
....redirect(endpoint + "?" + urllib.parse.urlencode(
args
))
async def get_authenticated_user(
self, http_client: Optional[httpclient.AsyncHTTPClient] = None
) -> Dict[str, Any]:
assert callback_uri is not None
args = self._openid_args(callback_uri, ax_attrs=ax_attrs)
endpoint = self._OPENID_ENDPOINT # type: ignore
handler.redirect(endpoint + "?" +
urllib.parse.urlencode(args)
)
async def get_authenticated_user(
self, http_client: Optional[httpclient.AsyncHTTPClient] = None
) -> Dict[str, Any]:
assert callback_uri is not None
args = self._openid_args(callback_uri, ax_attrs=ax_attrs)
endpoint = self._OPENID_ENDPOINT # type: ignore
handler.redirect(
endpoint + "?" + urllib.parse.urlencode(args)
)
async def get_authenticated_user(
self, http_client: Optional[httpclient.AsyncHTTPClient] = None
) -> Dict[str, Any]:
assert callback_uri is not None
args = self._openid_args(callback_uri, ax_attrs=ax_attrs)
endpoint = self._OPENID_ENDPOINT # type: ignore
handler.redirect(endpoint + "?" + urllib.parse.urlencode(args))
async def get_authenticated_user(
self, http_client: Optional[httpclient.AsyncHTTPClient] = None
) -> Dict[str, Any]:
| Уязвимый код | 438 handler.redirect(authorize_url + "?" + urllib.parse.urlencode(args)) |
|
|
|
return url + "?" + urllib.parse.urlencode(args)
def _on_request_token(
self,
authorize_url
: str,
callback_uri: Optional[str],
response: httpclient.HTTPResponse,
) -> None:
return url + "?" + urllib.parse.urlencode(args)
def _on_request_token(
self,
authorize_url
: str,
callback_uri: Optional[str],
response: httpclient.HTTPResponse,
) -> None:
args["oauth_callback"] = urllib.parse.urljoin(
handler.request.full_url(), callback_uri
)
handler.redirect(
authorize_url
+ "?" + urllib.parse.urlencode(args))
def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str:
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
args["oauth_callback"] = urllib.parse.urljoin(
handler.request.full_url(), callback_uri
)
handler.redirect(
authorize_url + "?" + urllib.parse.urlencode(args)
)
def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str:
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
args["oauth_callback"] = urllib.parse.urljoin(
handler.request.full_url(), callback_uri
)
handler.redirect(authorize_url + "?" + urllib.parse.urlencode(args))
def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str:
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
|
|
|
def _on_request_token(
self,
authorize_url: str,
callback_uri
: Optional[str],
response: httpclient.HTTPResponse,
) -> None:
handler = cast(RequestHandler, self)
def _on_request_token(
self,
authorize_url: str,
callback_uri
: Optional[str],
response: httpclient.HTTPResponse,
) -> None:
handler = cast(RequestHandler, self)
return
elif callback_uri:
args["oauth_callback"] = urllib.parse.urljoin(
handler.request.full_url(),
callback_uri
)
handler.redirect(authorize_url + "?" + urllib.parse.urlencode(args))
def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str:
handler.finish(authorize_url + "?" + urllib.parse.urlencode(args))
return
elif callback_uri:
args["oauth_callback"] = urllib.parse.urljoin(
handler.request.full_url(), callback_uri
)
handler.redirect(authorize_url + "?" + urllib.parse.urlencode(args))
handler.finish(authorize_url + "?" + urllib.parse.urlencode(args))
return
elif callback_uri:
args["oauth_callback"]
= urllib.parse.urljoin(
handler.request.full_url(), callback_uri
)
handler.redirect(authorize_url + "?" + urllib.parse.urlencode(args))
args["oauth_callback"] = urllib.parse.urljoin(
handler.request.full_url(), callback_uri
)
...rect(authorize_url + "?" + urllib.parse.urlencode(
args
))
def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str:
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
args["oauth_callback"] = urllib.parse.urljoin(
handler.request.full_url(), callback_uri
)
handler.redirect(authorize_url + "?" +
urllib.parse.urlencode(args)
)
def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str:
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
args["oauth_callback"] = urllib.parse.urljoin(
handler.request.full_url(), callback_uri
)
handler.redirect(
authorize_url + "?" + urllib.parse.urlencode(args)
)
def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str:
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
args["oauth_callback"] = urllib.parse.urljoin(
handler.request.full_url(), callback_uri
)
handler.redirect(authorize_url + "?" + urllib.parse.urlencode(args))
def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str:
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
|
|
|
self,
authorize_url: str,
callback_uri: Optional[str],
response
: httpclient.HTTPResponse,
) -> None:
handler = cast(RequestHandler, self)
request_token = _oauth_parse_response(response.body)
self,
authorize_url: str,
callback_uri: Optional[str],
response
: httpclient.HTTPResponse,
) -> None:
handler = cast(RequestHandler, self)
request_token = _oauth_parse_response(response.body)
response: httpclient.HTTPResponse,
) -> None:
handler = cast(RequestHandler, self)
request_token = _oauth_parse_response(
response.body
)
data = (
base64.b64encode(escape.utf8(request_token["key"]))
+ b"|"
if isinstance(val, unicode_type):
val = val.encode("utf-8")
return urllib.parse.quote(val, safe="~")
def _oauth_parse_response(
body
: bytes) -> Dict[str, Any]:
# I can't find an officially-defined encoding for oauth responses and
# have never seen anyone use non-ascii. Leave the response in a byte
# string for python 2, and use utf8 on python 3.
# I can't find an officially-defined encoding for oauth responses and
# have never seen anyone use non-ascii. Leave the response in a byte
# string for python 2, and use utf8 on python 3.
body
_str = escape.native_str(body)
p = urllib.parse.parse_qs(body_str, keep_blank_values=False)
token = dict(key=p["oauth_token"][0], secret=p["oauth_token_secret"][0])
# Add the extra parameters the Provider included to the token
# I can't find an officially-defined encoding for oauth responses and
# have never seen anyone use non-ascii. Leave the response in a byte
# string for python 2, and use utf8 on python 3.
body_str =
escape.native_str(body)
p = urllib.parse.parse_qs(body_str, keep_blank_values=False)
token = dict(key=p["oauth_token"][0], secret=p["oauth_token_secret"][0])
# Add the extra parameters the Provider included to the token
# I can't find an officially-defined encoding for oauth responses and
# have never seen anyone use non-ascii. Leave the response in a byte
# string for python 2, and use utf8 on python 3.
body_str
= escape.native_str(body)
p = urllib.parse.parse_qs(body_str, keep_blank_values=False)
token = dict(key=p["oauth_token"][0], secret=p["oauth_token_secret"][0])
# Add the extra parameters the Provider included to the token
# have never seen anyone use non-ascii. Leave the response in a byte
# string for python 2, and use utf8 on python 3.
body_str = escape.native_str(body)
p = urllib.parse.parse_qs(
body_str
, keep_blank_values=False)
token = dict(key=p["oauth_token"][0], secret=p["oauth_token_secret"][0])
# Add the extra parameters the Provider included to the token
special = ("oauth_token", "oauth_token_secret")
# have never seen anyone use non-ascii. Leave the response in a byte
# string for python 2, and use utf8 on python 3.
body_str = escape.native_str(body)
p =
urllib.parse.parse_qs(body_str, keep_blank_values=False)
token = dict(key=p["oauth_token"][0], secret=p["oauth_token_secret"][0])
# Add the extra parameters the Provider included to the token
special = ("oauth_token", "oauth_token_secret")
# have never seen anyone use non-ascii. Leave the response in a byte
# string for python 2, and use utf8 on python 3.
body_str = escape.native_str(body)
p
= urllib.parse.parse_qs(body_str, keep_blank_valu...
token = dict(key=p["oauth_token"][0], secret=p["oauth_token_secret"][0])
# Add the extra parameters the Provider included to the token
special = ("oauth_token", "oauth_token_secret")
# string for python 2, and use utf8 on python 3.
body_str = escape.native_str(body)
p = urllib.parse.parse_qs(body_str, keep_blank_values=False)
token = dict(key=
p
["oauth_token"][0], secret=p["oauth_token_secret"]...
# Add the extra parameters the Provider included to the token
special = ("oauth_token", "oauth_token_secret")
token.update((k, p[k][0]) for k in p if k not in special)
# string for python 2, and use utf8 on python 3.
body_str = escape.native_str(body)
p = urllib.parse.parse_qs(body_str, keep_blank_values=False)
token = dict(key=
p["oauth_token"]
[0], secret=p["oauth_token_secret"][0])
# Add the extra parameters the Provider included to the token
special = ("oauth_token", "oauth_token_secret")
token.update((k, p[k][0]) for k in p if k not in special)
# string for python 2, and use utf8 on python 3.
body_str = escape.native_str(body)
p = urllib.parse.parse_qs(body_str, keep_blank_values=False)
token = dict(key=
p["oauth_token"][0]
, secret=p["oauth_token_secret"][0])
# Add the extra parameters the Provider included to the token
special = ("oauth_token", "oauth_token_secret")
token.update((k, p[k][0]) for k in p if k not in special)
# string for python 2, and use utf8 on python 3.
body_str = escape.native_str(body)
p = urllib.parse.parse_qs(body_str, keep_blank_values=False)
token =
dict(key=p["oauth_token"][0], secret=p["oauth_token_secret"][0])
# Add the extra parameters the Provider included to the token
special = ("oauth_token", "oauth_token_secret")
token.update((k, p[k][0]) for k in p if k not in special)
# string for python 2, and use utf8 on python 3.
body_str = escape.native_str(body)
p = urllib.parse.parse_qs(body_str, keep_blank_values=False)
token
= dict(key=p["oauth_token"][0], secret=p["oauth_t...
# Add the extra parameters the Provider included to the token
special = ("oauth_token", "oauth_token_secret")
token.update((k, p[k][0]) for k in p if k not in special)
token = dict(key=p["oauth_token"][0], secret=p["oauth_token_secret"][0])
# Add the extra parameters the Provider included to the token
special = ("oauth_token", "oauth_token_secret")
token.update
((k, p[k][0]) for k in p if k not in special)
return token
token = dict(key=p["oauth_token"][0], secret=p["oauth_token_secret"][0])
# Add the extra parameters the Provider included to the token
special = ("oauth_token", "oauth_token_secret")
token.update((k, p[k][0]) for k in p if k not in special)
return token
# Add the extra parameters the Provider included to the token
special = ("oauth_token", "oauth_token_secret")
token.update((k, p[k][0]) for k in p if k not in special)
return
token
response: httpclient.HTTPResponse,
) -> None:
handler = cast(RequestHandler, self)
request_token
= _oauth_parse_response(response.body)
data = (
base64.b64encode(escape.utf8(request_token["key"]))
+ b"|"
+ base64.b64encode(escape.utf8(request_token["secret"]))
)
handler.set_cookie("_oauth_request_token", data)
args = dict(oauth_token=
request_token
["key"])
if callback_uri == "oob":
handler.finish(authorize_url + "?" + urllib.parse.urlencode(args))
return
+ base64.b64encode(escape.utf8(request_token["secret"]))
)
handler.set_cookie("_oauth_request_token", data)
args = dict(oauth_token=
request_token["key"]
)
if callback_uri == "oob":
handler.finish(authorize_url + "?" + urllib.parse.urlencode(args))
return
+ base64.b64encode(escape.utf8(request_token["secret"]))
)
handler.set_cookie("_oauth_request_token", data)
args =
dict(oauth_token=request_token["key"])
if callback_uri == "oob":
handler.finish(authorize_url + "?" + urllib.parse.urlencode(args))
return
+ base64.b64encode(escape.utf8(request_token["secret"]))
)
handler.set_cookie("_oauth_request_token", data)
args
= dict(oauth_token=request_token["key"])
if callback_uri == "oob":
handler.finish(authorize_url + "?" + urllib.parse.urlencode(args))
return
handler.finish(authorize_url + "?" + urllib.parse.urlencode(args))
return
elif callback_uri:
args
["oauth_callback"] = urllib.parse.urljoin(
handler.request.full_url(), callback_uri
)
handler.redirect(authorize_url + "?" + urllib.parse.urlencode(args))
handler.finish(authorize_url + "?" + urllib.parse.urlencode(args))
return
elif callback_uri:
args["oauth_callback"]
= urllib.parse.urljoin(
handler.request.full_url(), callback_uri
)
handler.redirect(authorize_url + "?" + urllib.parse.urlencode(args))
args["oauth_callback"] = urllib.parse.urljoin(
handler.request.full_url(), callback_uri
)
...rect(authorize_url + "?" + urllib.parse.urlencode(
args
))
def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str:
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
args["oauth_callback"] = urllib.parse.urljoin(
handler.request.full_url(), callback_uri
)
handler.redirect(authorize_url + "?" +
urllib.parse.urlencode(args)
)
def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str:
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
args["oauth_callback"] = urllib.parse.urljoin(
handler.request.full_url(), callback_uri
)
handler.redirect(
authorize_url + "?" + urllib.parse.urlencode(args)
)
def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str:
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
args["oauth_callback"] = urllib.parse.urljoin(
handler.request.full_url(), callback_uri
)
handler.redirect(authorize_url + "?" + urllib.parse.urlencode(args))
def _oauth_access_token_url(self, request_token: Dict[str, Any]) -> str:
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL # type: ignore
| Уязвимый код | 586 handler.redirect(url_concat(url, args)) |
|
|
|
"""
def authorize_redirect(
self,
redirect_uri
: Optional[str] = None,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
extra_params: Optional[Dict[str, Any]] = None,
"""
def authorize_redirect(
self,
redirect_uri
: Optional[str] = None,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
extra_params: Optional[Dict[str, Any]] = None,
handler = cast(RequestHandler, self)
args = {"response_type": response_type}
if redirect_uri is not None:
args["
redirect_uri
"] = redirect_uri
if client_id is not None:
args["client_id"] = client_id
if extra_params:
handler = cast(RequestHandler, self)
args = {"response_type": response_type}
if redirect_uri is not None:
args["redirect_uri"]
= redirect_uri
if client_id is not None:
args["client_id"] = client_id
if extra_params:
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(url_concat(url,
args
))
def _oauth_request_token_url(
self,
redirect_uri: Optional[str] = None,
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(
url_concat(url, args)
)
def _oauth_request_token_url(
self,
redirect_uri: Optional[str] = None,
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(url_concat(url, args))
def _oauth_request_token_url(
self,
redirect_uri: Optional[str] = None,
|
|
|
def authorize_redirect(
self,
redirect_uri: Optional[str] = None,
client_id
: Optional[str] = None,
client_secret: Optional[str] = None,
extra_params: Optional[Dict[str, Any]] = None,
scope: Optional[str] = None,
def authorize_redirect(
self,
redirect_uri: Optional[str] = None,
client_id
: Optional[str] = None,
client_secret: Optional[str] = None,
extra_params: Optional[Dict[str, Any]] = None,
scope: Optional[str] = None,
if redirect_uri is not None:
args["redirect_uri"] = redirect_uri
if client_id is not None:
args["
client_id
"] = client_id
if extra_params:
args.update(extra_params)
if scope:
if redirect_uri is not None:
args["redirect_uri"] = redirect_uri
if client_id is not None:
args["client_id"]
= client_id
if extra_params:
args.update(extra_params)
if scope:
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(url_concat(url,
args
))
def _oauth_request_token_url(
self,
redirect_uri: Optional[str] = None,
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(
url_concat(url, args)
)
def _oauth_request_token_url(
self,
redirect_uri: Optional[str] = None,
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(url_concat(url, args))
def _oauth_request_token_url(
self,
redirect_uri: Optional[str] = None,
|
|
|
redirect_uri: Optional[str] = None,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
extra_params
: Optional[Dict[str, Any]] = None,
scope: Optional[str] = None,
response_type: str = "code",
) -> None:
redirect_uri: Optional[str] = None,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
extra_params
: Optional[Dict[str, Any]] = None,
scope: Optional[str] = None,
response_type: str = "code",
) -> None:
if client_id is not None:
args["client_id"] = client_id
if extra_params:
args.update(
extra_params
)
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
if client_id is not None:
args["client_id"] = client_id
if extra_params:
args.update(extra_params)
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
if extra_params:
args.update(extra_params)
if scope:
args
["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(url_concat(url, args))
def _oauth_request_token_url(
if extra_params:
args.update(extra_params)
if scope:
args["scope"]
= " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(url_concat(url, args))
def _oauth_request_token_url(
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(url_concat(url,
args
))
def _oauth_request_token_url(
self,
redirect_uri: Optional[str] = None,
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(
url_concat(url, args)
)
def _oauth_request_token_url(
self,
redirect_uri: Optional[str] = None,
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(url_concat(url, args))
def _oauth_request_token_url(
self,
redirect_uri: Optional[str] = None,
|
|
|
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
extra_params: Optional[Dict[str, Any]] = None,
scope
: Optional[str] = None,
response_type: str = "code",
) -> None:
"""Redirects the user to obtain OAuth authorization for this service.
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
extra_params: Optional[Dict[str, Any]] = None,
scope
: Optional[str] = None,
response_type: str = "code",
) -> None:
"""Redirects the user to obtain OAuth authorization for this service.
if extra_params:
args.update(extra_params)
if scope:
args["
scope
"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(url_concat(url, args))
def _oauth_request_token_url(
if extra_params:
args.update(extra_params)
if scope:
args["scope"] = "
".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(url_concat(url, args))
def _oauth_request_token_url(
if extra_params:
args.update(extra_params)
if scope:
args["scope"]
= " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(url_concat(url, args))
def _oauth_request_token_url(
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(url_concat(url,
args
))
def _oauth_request_token_url(
self,
redirect_uri: Optional[str] = None,
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(
url_concat(url, args)
)
def _oauth_request_token_url(
self,
redirect_uri: Optional[str] = None,
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(url_concat(url, args))
def _oauth_request_token_url(
self,
redirect_uri: Optional[str] = None,
|
|
|
client_secret: Optional[str] = None,
extra_params: Optional[Dict[str, Any]] = None,
scope: Optional[str] = None,
response_type
: str = "code",
) -> None:
"""Redirects the user to obtain OAuth authorization for this service.
Some providers require that you register a redirect URL with
client_secret: Optional[str] = None,
extra_params: Optional[Dict[str, Any]] = None,
scope: Optional[str] = None,
response_type
: str = "code",
) -> None:
"""Redirects the user to obtain OAuth authorization for this service.
Some providers require that you register a redirect URL with
this is now an ordinary synchronous function.
"""
handler = cast(RequestHandler, self)
args = {"
response_type
": response_type}
if redirect_uri is not None:
args["redirect_uri"] = redirect_uri
if client_id is not None:
this is now an ordinary synchronous function.
"""
handler = cast(RequestHandler, self)
args = {
"response_type"
: response_type}
if redirect_uri is not None:
args["redirect_uri"] = redirect_uri
if client_id is not None:
this is now an ordinary synchronous function.
"""
handler = cast(RequestHandler, self)
args =
{"response_type": response_type}
if redirect_uri is not None:
args["redirect_uri"] = redirect_uri
if client_id is not None:
this is now an ordinary synchronous function.
"""
handler = cast(RequestHandler, self)
args
= {"response_type": response_type}
if redirect_uri is not None:
args["redirect_uri"] = redirect_uri
if client_id is not None:
handler = cast(RequestHandler, self)
args = {"response_type": response_type}
if redirect_uri is not None:
args
["redirect_uri"] = redirect_uri
if client_id is not None:
args["client_id"] = client_id
if extra_params:
handler = cast(RequestHandler, self)
args = {"response_type": response_type}
if redirect_uri is not None:
args["redirect_uri"]
= redirect_uri
if client_id is not None:
args["client_id"] = client_id
if extra_params:
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(url_concat(url,
args
))
def _oauth_request_token_url(
self,
redirect_uri: Optional[str] = None,
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(
url_concat(url, args)
)
def _oauth_request_token_url(
self,
redirect_uri: Optional[str] = None,
if scope:
args["scope"] = " ".join(scope)
url = self._OAUTH_AUTHORIZE_URL # type: ignore
handler.redirect(url_concat(url, args))
def _oauth_request_token_url(
self,
redirect_uri: Optional[str] = None,
| Уязвимый код | 2492 self.redirect(to_url, permanent=self._permanent) |
|
|
|
def initialize(self, url: str, permanent: bool = True) -> None:
self._url = url
self._permanent = permanent
def get(self, *
args
: Any, **kwargs: Any) -> None:
to_url = self._url.format(*args, **kwargs)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
def initialize(self, url: str, permanent: bool = True) -> None:
self._url = url
self._permanent = permanent
def get(self, *
args
: Any, **kwargs: Any) -> None:
to_url = self._url.format(*args, **kwargs)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
self._url = url
self._permanent = permanent
def get(self, *args: Any, **kwargs: Any) -> None:
to_url = self._url.format(*
args
, **kwargs)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
to_url = httputil.url_concat(
self._url = url
self._permanent = permanent
def get(self, *args: Any, **kwargs: Any) -> None:
to_url = self._url.format(
*args
, **kwargs)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
to_url = httputil.url_concat(
self._url = url
self._permanent = permanent
def get(self, *args: Any, **kwargs: Any) -> None:
to_url =
self._url.format(*args, **kwargs)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
to_url = httputil.url_concat(
self._url = url
self._permanent = permanent
def get(self, *args: Any, **kwargs: Any) -> None:
to_url
= self._url.format(*args, **kwargs)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
to_url = httputil.url_concat(
if self.request.query_arguments:
# TODO: figure out typing for the next line.
to_url = httputil.url_concat(
to_url
,
list(httputil.qs_to_qsl(self.request.query_arguments)), # type: ignore
)
self.redirect(to_url, permanent=self._permanent)
to_url = self._url.format(*args, **kwargs)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
to_url = httputil.url_concat(
to_url,
list(httputil.qs_to_qsl(self.request.query_arguments)), # type: ignore
)
to_url = self._url.format(*args, **kwargs)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
to_url
= httputil.url_concat(
to_url,
list(httputil.qs_to_qsl(self.request.query_arguments)), # type: ignore
)
to_url,
list(httputil.qs_to_qsl(self.request.query_arguments)), # type: ignore
)
self.redirect(
to_url
, permanent=self._permanent)
class StaticFileHandler(RequestHandler):
"""A simple handler that can serve static content from a directory.
A `StaticFileHandler` is configured automatically if you pass the
to_url,
list(httputil.qs_to_qsl(self.request.query_arguments)), # type: ignore
)
self.redirect(to_url, permanent=self._permanent)
class StaticFileHandler(RequestHandler):
"""A simple handler that can serve static content from a directory.
A `StaticFileHandler` is configured automatically if you pass the
|
|
|
def initialize(self, url: str, permanent: bool = True) -> None:
self._url = url
self._permanent = permanent
def get(self, *args: Any, **
kwargs
: Any) -> None:
to_url = self._url.format(*args, **kwargs)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
def initialize(self, url: str, permanent: bool = True) -> None:
self._url = url
self._permanent = permanent
def get(self, *args: Any, **
kwargs
: Any) -> None:
to_url = self._url.format(*args, **kwargs)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
self._url = url
self._permanent = permanent
def get(self, *args: Any, **kwargs: Any) -> None:
to_url = self._url.format(*args, **
kwargs
)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
to_url = httputil.url_concat(
self._url = url
self._permanent = permanent
def get(self, *args: Any, **kwargs: Any) -> None:
to_url = self._url.format(*args,
**kwargs
)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
to_url = httputil.url_concat(
self._url = url
self._permanent = permanent
def get(self, *args: Any, **kwargs: Any) -> None:
to_url =
self._url.format(*args, **kwargs)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
to_url = httputil.url_concat(
self._url = url
self._permanent = permanent
def get(self, *args: Any, **kwargs: Any) -> None:
to_url
= self._url.format(*args, **kwargs)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
to_url = httputil.url_concat(
if self.request.query_arguments:
# TODO: figure out typing for the next line.
to_url = httputil.url_concat(
to_url
,
list(httputil.qs_to_qsl(self.request.query_arguments)), # type: ignore
)
self.redirect(to_url, permanent=self._permanent)
to_url = self._url.format(*args, **kwargs)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
to_url = httputil.url_concat(
to_url,
list(httputil.qs_to_qsl(self.request.query_arguments)), # type: ignore
)
to_url = self._url.format(*args, **kwargs)
if self.request.query_arguments:
# TODO: figure out typing for the next line.
to_url
= httputil.url_concat(
to_url,
list(httputil.qs_to_qsl(self.request.query_arguments)), # type: ignore
)
to_url,
list(httputil.qs_to_qsl(self.request.query_arguments)), # type: ignore
)
self.redirect(
to_url
, permanent=self._permanent)
class StaticFileHandler(RequestHandler):
"""A simple handler that can serve static content from a directory.
A `StaticFileHandler` is configured automatically if you pass the
to_url,
list(httputil.qs_to_qsl(self.request.query_arguments)), # type: ignore
)
self.redirect(to_url, permanent=self._permanent)
class StaticFileHandler(RequestHandler):
"""A simple handler that can serve static content from a directory.
A `StaticFileHandler` is configured automatically if you pass the
| Уязвимый код | 92 self.redirect(self.reverse_url("countdown", count - 1)) |
|
|
|
def get(self):
self.finish(self.request.headers["Authorization"])
class CountdownHandler(RequestHandler):
def get(self,
count
):
count = int(count)
if count > 0:
self.redirect(self.reverse_url("countdown", count - 1))
def get(self):
self.finish(self.request.headers["Authorization"])
class CountdownHandler(RequestHandler):
def get(self,
count
):
count = int(count)
if count > 0:
self.redirect(self.reverse_url("countdown", count - 1))
self.finish(self.request.headers["Authorization"])
class CountdownHandler(RequestHandler):
def get(self, count):
count
= int(count)
if count > 0:
self.redirect(self.reverse_url("countdown", count - 1))
else:
self.finish(self.request.headers["Authorization"])
class CountdownHandler(RequestHandler):
def get(self, count):
count =
int(count)
if count > 0:
self.redirect(self.reverse_url("countdown", count - 1))
else:
self.finish(self.request.headers["Authorization"])
class CountdownHandler(RequestHandler):
def get(self, count):
count
= int(count)
if count > 0:
self.redirect(self.reverse_url("countdown", count - 1))
else:
def get(self, count):
count = int(count)
if count > 0:
self.redirect(self.reverse_url("
count
down", count - 1))
else:
self.write("Zero")
class EchoPostHandler(RequestHandler):
def get(self, count):
count = int(count)
if count > 0:
... self.redirect(self.reverse_url("countdown",
count - 1
))
else:
self.write("Zero")
class EchoPostHandler(RequestHandler):
def get(self, count):
count = int(count)
if count > 0:
self.redirect(
self.reverse_url("countdown", count - 1)
)
else:
self.write("Zero")
class EchoPostHandler(RequestHandler):
def get(self, count):
count = int(count)
if count > 0:
self.redirect(self.reverse_url("countdown", count - 1))
else:
self.write("Zero")
class EchoPostHandler(RequestHandler):
| Уязвимый код | 652 self.redirect("/", permanent=int(self.get_argument("permanent"))) |
|
|
|
self.application = ObjectDict(
settings=dict(cookie_secret=cookie_secret, key_version=key_version)
)
def get_cookie(self,
name
):
return self._cookies.get(name)
def set_cookie(self, name, value, expires_days=None):
self._cookies[name] = value
self.application = ObjectDict(
settings=dict(cookie_secret=cookie_secret, key_version=key_version)
)
def get_cookie(self,
name
):
return self._cookies.get(name)
def set_cookie(self, name, value, expires_days=None):
self._cookies[name] = value
settings=dict(cookie_secret=cookie_secret, key_version=key_version)
)
def get_cookie(self, name):
return self._cookies.get(
name
)
def set_cookie(self, name, value, expires_days=None):
self._cookies[name] = value
# See SignedValueTest below for more.
self.add_header("x-multi", 3)
self.add_header("X-Multi", "4")
class RedirectHandler(RequestHandler):
def get(
self
):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(self.get_argument("permanent")))
elif self.get_argument("status", None) is not None:
class RedirectHandler(RequestHandler):
def get(self):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(
self.get_argument
("permanent")))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(self.get_argument("status")))
else:
class RedirectHandler(RequestHandler):
def get(self):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(
self.get_argument("permanent")
))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(self.get_argument("status")))
else:
class RedirectHandler(RequestHandler):
def get(self):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=
int(self.get_argument("permanent"))
)
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(self.get_argument("status")))
else:
class RedirectHandler(RequestHandler):
def get(self):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(self.get_argument("permanent")))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(self.get_argument("status")))
else:
| Уязвимый код | 654 self.redirect("/", status=int(self.get_argument("status"))) |
|
|
|
self.application = ObjectDict(
settings=dict(cookie_secret=cookie_secret, key_version=key_version)
)
def get_cookie(self,
name
):
return self._cookies.get(name)
def set_cookie(self, name, value, expires_days=None):
self._cookies[name] = value
self.application = ObjectDict(
settings=dict(cookie_secret=cookie_secret, key_version=key_version)
)
def get_cookie(self,
name
):
return self._cookies.get(name)
def set_cookie(self, name, value, expires_days=None):
self._cookies[name] = value
settings=dict(cookie_secret=cookie_secret, key_version=key_version)
)
def get_cookie(self, name):
return self._cookies.get(
name
)
def set_cookie(self, name, value, expires_days=None):
self._cookies[name] = value
# See SignedValueTest below for more.
self.add_header("x-multi", 3)
self.add_header("X-Multi", "4")
class RedirectHandler(RequestHandler):
def get(
self
):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(self.get_argument("permanent")))
elif self.get_argument("status", None) is not None:
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(self.get_argument("permanent")))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(
self.get_argument
("status")))
else:
raise Exception("didn't get permanent or status arguments")
class EmptyFlushCallbackHandler(RequestHandler):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(self.get_argument("permanent")))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(
self.get_argument("status")
))
else:
raise Exception("didn't get permanent or status arguments")
class EmptyFlushCallbackHandler(RequestHandler):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(self.get_argument("permanent")))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=
int(self.get_argument("status"))
)
else:
raise Exception("didn't get permanent or status arguments")
class EmptyFlushCallbackHandler(RequestHandler):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(self.get_argument("permanent")))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(self.get_argument("status")))
else:
raise Exception("didn't get permanent or status arguments")
class EmptyFlushCallbackHandler(RequestHandler):
| Уязвимый код | 22 self.redirect(path, status=int(self.get_argument('status', '302'))) |
|
|
|
def get(self):
self.write("Hello world")
class RedirectHandler(RequestHandler):
def get(self,
path
):
self.redirect(path, status=int(self.get_argument('status', '302')))
class PostHandler(RequestHandler):
def post(self):
def get(self):
self.write("Hello world")
class RedirectHandler(RequestHandler):
def get(self,
path
):
self.redirect(path, status=int(self.get_argument('status', '302')))
class PostHandler(RequestHandler):
def post(self):
self.write("Hello world")
class RedirectHandler(RequestHandler):
def get(self, path):
self.redirect(
path
, status=int(self.get_argument('status', '302')))
class PostHandler(RequestHandler):
def post(self):
assert self.get_argument('foo') == 'bar'
self.write("Hello world")
class RedirectHandler(RequestHandler):
def get(self, path):
self.redirect(path, status=int(self.get_argument('status', '302')))
class PostHandler(RequestHandler):
def post(self):
assert self.get_argument('foo') == 'bar'
|
|
|
$.ajax({url: url, data: $.param(args), dataType: "text", type: "POST",
success: function(response) {
if (callback) callback(eval("(" + response + ")"));
}, error: function(
response
) {
console.log("ERROR:", response);
}});
};
$.ajax({url: url, data: $.param(args), dataType: "text", type: "POST",
success: function(response) {
if (callback) callback(eval("(" + response + ")"));
}, error: function(
response
) {
console.log("ERROR:", response);
}});
};
success: function(response) {
if (callback) callback(eval("(" + response + ")"));
}, error: function(response) {
console.log("ERROR:",
response
);
}});
};
jQuery.fn.formToDict = function() {
success: function(response) {
if (callback) callback(eval("(" + response + ")"));
}, error: function(response) {
console.log("ERROR:", response)
;
}});
};
jQuery.fn.formToDict = function() {
| Уязвимый код | 121 console.log(messages.length, "new messages, cursor:", updater.cursor) |
|
|
|
data: $.param(args), success: updater.onSuccess,
error: updater.onError});
},
onSuccess: function(
response
) {
try {
updater.newMessages(eval("(" + response + ")"));
} catch (e) {
data: $.param(args), success: updater.onSuccess,
error: updater.onError});
},
onSuccess: function(
response
) {
try {
updater.newMessages(eval("(" + response + ")"));
} catch (e) {
},
onSuccess: function(response) {
try {
updater.newMessages(eval("(" +
response
+ ")"));
} catch (e) {
updater.onError();
return;
},
onSuccess: function(response) {
try {
updater.newMessages(eval(
"(" + response + ")"
));
} catch (e) {
updater.onError();
return;
},
onSuccess: function(response) {
try {
updater.newMessages(
eval("(" + response + ")")
);
} catch (e) {
updater.onError();
return;
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(
response
) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
},
newMessages: function(response) {
if (!response.messages) return;
var messages =
response.messages
;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
},
newMessages: function(response) {
if (!response.messages) return;
var
messages
= response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
newMessages: function(response) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor =
messages[messages.length - 1].id
;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(messages[i]);
newMessages: function(response) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor
= messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(messages[i]);
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
...ole.log(messages.length, "new messages, cursor:",
updater.cursor
);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(messages[i]);
}
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor)
;
for (var i = 0; i < messages.length; i++) {
updater.showMessage(messages[i]);
}
|
|
|
data: $.param(args), success: updater.onSuccess,
error: updater.onError});
},
onSuccess: function(
response
) {
try {
updater.newMessages(eval("(" + response + ")"));
} catch (e) {
data: $.param(args), success: updater.onSuccess,
error: updater.onError});
},
onSuccess: function(
response
) {
try {
updater.newMessages(eval("(" + response + ")"));
} catch (e) {
},
onSuccess: function(response) {
try {
updater.newMessages(eval("(" +
response
+ ")"));
} catch (e) {
updater.onError();
return;
},
onSuccess: function(response) {
try {
updater.newMessages(eval(
"(" + response + ")"
));
} catch (e) {
updater.onError();
return;
},
onSuccess: function(response) {
try {
updater.newMessages(
eval("(" + response + ")")
);
} catch (e) {
updater.onError();
return;
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(
response
) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
},
newMessages: function(response) {
if (!response.messages) return;
var messages =
response.messages
;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
},
newMessages: function(response) {
if (!response.messages) return;
var
messages
= response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(
messages.length
, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(messages[i]);
}
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor)
;
for (var i = 0; i < messages.length; i++) {
updater.showMessage(messages[i]);
}
|
|
|
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(
response
) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(
response
) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
},
newMessages: function(response) {
if (!response.messages) return;
var messages =
response.messages
;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
},
newMessages: function(response) {
if (!response.messages) return;
var
messages
= response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
newMessages: function(response) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor =
messages[messages.length - 1].id
;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(messages[i]);
newMessages: function(response) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor
= messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(messages[i]);
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
...ole.log(messages.length, "new messages, cursor:",
updater.cursor
);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(messages[i]);
}
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor)
;
for (var i = 0; i < messages.length; i++) {
updater.showMessage(messages[i]);
}
|
|
|
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(
response
) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(
response
) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
},
newMessages: function(response) {
if (!response.messages) return;
var messages =
response.messages
;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
},
newMessages: function(response) {
if (!response.messages) return;
var
messages
= response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(
messages.length
, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(messages[i]);
}
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor)
;
for (var i = 0; i < messages.length; i++) {
updater.showMessage(messages[i]);
}
|
|
|
data: $.param(args), success: updater.onSuccess,
error: updater.onError});
},
onSuccess: function(
response
) {
try {
updater.newMessages(eval("(" + response + ")"));
} catch (e) {
data: $.param(args), success: updater.onSuccess,
error: updater.onError});
},
onSuccess: function(
response
) {
try {
updater.newMessages(eval("(" + response + ")"));
} catch (e) {
},
onSuccess: function(response) {
try {
updater.newMessages(eval("(" +
response
+ ")"));
} catch (e) {
updater.onError();
return;
},
onSuccess: function(response) {
try {
updater.newMessages(eval(
"(" + response + ")"
));
} catch (e) {
updater.onError();
return;
},
onSuccess: function(response) {
try {
updater.newMessages(
eval("(" + response + ")")
);
} catch (e) {
updater.onError();
return;
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(
response
) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
},
newMessages: function(response) {
if (!response.messages) return;
var messages =
response.messages
;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
},
newMessages: function(response) {
if (!response.messages) return;
var
messages
= response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(
messages[i]
);
}
},
showMessage: function(message) {
updater.showMessage(messages[i]);
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
}
},
showMessage: function(message) {
var existing = $("#m" +
message.id
);
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
}
},
showMessage: function(message) {
var existing = $(
"#m" + message.id
);
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
}
},
showMessage: function(message) {
var existing =
$("#m" + message.id)
;
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
|
|
|
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(
response
) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(
response
) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
},
newMessages: function(response) {
if (!response.messages) return;
var messages =
response.messages
;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
},
newMessages: function(response) {
if (!response.messages) return;
var
messages
= response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(
messages[i]
);
}
},
showMessage: function(message) {
updater.showMessage(messages[i]);
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
}
},
showMessage: function(message) {
var existing = $("#m" +
message.id
);
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
}
},
showMessage: function(message) {
var existing = $(
"#m" + message.id
);
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
}
},
showMessage: function(message) {
var existing =
$("#m" + message.id)
;
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
|
|
|
updater.showMessage(messages[i]);
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
updater.showMessage(messages[i]);
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
}
},
showMessage: function(message) {
var existing = $("#m" +
message.id
);
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
}
},
showMessage: function(message) {
var existing = $(
"#m" + message.id
);
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
}
},
showMessage: function(message) {
var existing =
$("#m" + message.id)
;
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
|
|
|
data: $.param(args), success: updater.onSuccess,
error: updater.onError});
},
onSuccess: function(
response
) {
try {
updater.newMessages(eval("(" + response + ")"));
} catch (e) {
data: $.param(args), success: updater.onSuccess,
error: updater.onError});
},
onSuccess: function(
response
) {
try {
updater.newMessages(eval("(" + response + ")"));
} catch (e) {
},
onSuccess: function(response) {
try {
updater.newMessages(eval("(" +
response
+ ")"));
} catch (e) {
updater.onError();
return;
},
onSuccess: function(response) {
try {
updater.newMessages(eval(
"(" + response + ")"
));
} catch (e) {
updater.onError();
return;
},
onSuccess: function(response) {
try {
updater.newMessages(
eval("(" + response + ")")
);
} catch (e) {
updater.onError();
return;
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(
response
) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
},
newMessages: function(response) {
if (!response.messages) return;
var messages =
response.messages
;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
},
newMessages: function(response) {
if (!response.messages) return;
var
messages
= response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(
messages[i]
);
}
},
showMessage: function(message) {
updater.showMessage(messages[i]);
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(
message.html
);
node.hide();
$("#inbox").append(node);
node.slideDown();
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node =
$(message.html)
;
node.hide();
$("#inbox").append(node);
node.slideDown();
|
|
|
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(
response
) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(
response
) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
},
newMessages: function(response) {
if (!response.messages) return;
var messages =
response.messages
;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
},
newMessages: function(response) {
if (!response.messages) return;
var
messages
= response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(
messages[i]
);
}
},
showMessage: function(message) {
updater.showMessage(messages[i]);
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(
message.html
);
node.hide();
$("#inbox").append(node);
node.slideDown();
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node =
$(message.html)
;
node.hide();
$("#inbox").append(node);
node.slideDown();
|
|
|
updater.showMessage(messages[i]);
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
updater.showMessage(messages[i]);
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(
message.html
);
node.hide();
$("#inbox").append(node);
node.slideDown();
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node =
$(message.html)
;
node.hide();
$("#inbox").append(node);
node.slideDown();
|
|
|
updater.showMessage(JSON.parse(event.data));
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
updater.showMessage(JSON.parse(event.data));
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
}
},
showMessage: function(message) {
var existing = $("#m" +
message.id
);
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
}
},
showMessage: function(message) {
var existing = $(
"#m" + message.id
);
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
}
},
showMessage: function(message) {
var existing =
$("#m" + message.id)
;
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
|
|
|
updater.showMessage(JSON.parse(event.data));
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
updater.showMessage(JSON.parse(event.data));
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(
message.html
);
node.hide();
$("#inbox").append(node);
node.slideDown();
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node =
$(message.html)
;
node.hide();
$("#inbox").append(node);
node.slideDown();
|
|
|
data: $.param(args), success: updater.onSuccess,
error: updater.onError});
},
onSuccess: function(
response
) {
try {
updater.newMessages(eval("(" + response + ")"));
} catch (e) {
data: $.param(args), success: updater.onSuccess,
error: updater.onError});
},
onSuccess: function(
response
) {
try {
updater.newMessages(eval("(" + response + ")"));
} catch (e) {
},
onSuccess: function(response) {
try {
updater.newMessages(eval("(" +
response
+ ")"));
} catch (e) {
updater.onError();
return;
},
onSuccess: function(response) {
try {
updater.newMessages(eval(
"(" + response + ")"
));
} catch (e) {
updater.onError();
return;
},
onSuccess: function(response) {
try {
updater.newMessages(
eval("(" + response + ")")
);
} catch (e) {
updater.onError();
return;
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(
response
) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
},
newMessages: function(response) {
if (!response.messages) return;
var messages =
response.messages
;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
},
newMessages: function(response) {
if (!response.messages) return;
var
messages
= response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(
messages[i]
);
}
},
showMessage: function(message) {
updater.showMessage(messages[i]);
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(
message.html
);
node.hide();
$("#inbox").append(node);
node.slideDown();
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node =
$(message.html)
;
node.hide();
$("#inbox").append(node);
node.slideDown();
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var
node
= $(message.html);
node.hide();
$("#inbox").append(node);
node.slideDown();
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
$("#inbox").append(
node
);
node.slideDown();
}
};
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
$("#inbox").append(node)
;
node.slideDown();
}
};
|
|
|
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(
response
) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log("Poll error; sleeping for", updater.errorSleepTime, "ms");
window.setTimeout(updater.poll, updater.errorSleepTime);
},
newMessages: function(
response
) {
if (!response.messages) return;
var messages = response.messages;
updater.cursor = messages[messages.length - 1].id;
},
newMessages: function(response) {
if (!response.messages) return;
var messages =
response.messages
;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
},
newMessages: function(response) {
if (!response.messages) return;
var
messages
= response.messages;
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.cursor = messages[messages.length - 1].id;
console.log(messages.length, "new messages, cursor:", updater.cursor);
for (var i = 0; i < messages.length; i++) {
updater.showMessage(
messages[i]
);
}
},
showMessage: function(message) {
updater.showMessage(messages[i]);
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(
message.html
);
node.hide();
$("#inbox").append(node);
node.slideDown();
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node =
$(message.html)
;
node.hide();
$("#inbox").append(node);
node.slideDown();
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var
node
= $(message.html);
node.hide();
$("#inbox").append(node);
node.slideDown();
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
$("#inbox").append(
node
);
node.slideDown();
}
};
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
$("#inbox").append(node)
;
node.slideDown();
}
};
|
|
|
updater.showMessage(messages[i]);
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
updater.showMessage(messages[i]);
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(
message.html
);
node.hide();
$("#inbox").append(node);
node.slideDown();
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node =
$(message.html)
;
node.hide();
$("#inbox").append(node);
node.slideDown();
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var
node
= $(message.html);
node.hide();
$("#inbox").append(node);
node.slideDown();
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
$("#inbox").append(
node
);
node.slideDown();
}
};
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
$("#inbox").append(node)
;
node.slideDown();
}
};
|
|
|
updater.showMessage(JSON.parse(event.data));
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
updater.showMessage(JSON.parse(event.data));
}
},
showMessage: function(
message
) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(message.html);
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node = $(
message.html
);
node.hide();
$("#inbox").append(node);
node.slideDown();
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var node =
$(message.html)
;
node.hide();
$("#inbox").append(node);
node.slideDown();
showMessage: function(message) {
var existing = $("#m" + message.id);
if (existing.length > 0) return;
var
node
= $(message.html);
node.hide();
$("#inbox").append(node);
node.slideDown();
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
$("#inbox").append(
node
);
node.slideDown();
}
};
if (existing.length > 0) return;
var node = $(message.html);
node.hide();
$("#inbox").append(node)
;
node.slideDown();
}
};
Из предыдущих сканирований (305)
| Уязвимый код | 197 self.execute( "UPDATE entries SET title = %s, markdown = %s, html = %s " "WHERE id = %s", title, text, html, int(id), ) |
|
|
|
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title =
self.get_argument
("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title =
self.get_argument
("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title =
self.get_argument("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title
= self.get_argument("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
await self.execute(
"UPDATE entries SET title = %s, markdown = %s, html = %s "
"WHERE id = %s",
title
,
text,
html,
int(id),
except NoResultError:
raise tornado.web.HTTPError(404)
slug = entry.slug
await self.execute(
"UPDATE entries SET title = %s, markdown = %s, html = %s "
"WHERE id = %s",
title,
|
|
|
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text =
self.get_argument
("markdown")
html = markdown.markdown(text)
if id:
try:
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text =
self.get_argument
("markdown")
html = markdown.markdown(text)
if id:
try:
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text =
self.get_argument("markdown")
html = markdown.markdown(text)
if id:
try:
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text
= self.get_argument("markdown")
html = markdown.markdown(text)
if id:
try:
id = self.get_argument("id", None)
title = self.get_argument("title")
text = self.get_argument("markdown")
html = markdown.markdown(
text
)
if id:
try:
entry = await self.queryone(
id = self.get_argument("id", None)
title = self.get_argument("title")
text = self.get_argument("markdown")
html =
markdown.markdown(text)
if id:
try:
entry = await self.queryone(
id = self.get_argument("id", None)
title = self.get_argument("title")
text = self.get_argument("markdown")
html
= markdown.markdown(text)
if id:
try:
entry = await self.queryone(
"WHERE id = %s",
title,
text,
html
,
int(id),
)
else:
except NoResultError:
raise tornado.web.HTTPError(404)
slug = entry.slug
await self.execute(
"UPDATE entries SET title = %s, markdown = %s, html = %s "
"WHERE id = %s",
title,
|
|
|
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text =
self.get_argument
("markdown")
html = markdown.markdown(text)
if id:
try:
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text =
self.get_argument
("markdown")
html = markdown.markdown(text)
if id:
try:
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text =
self.get_argument("markdown")
html = markdown.markdown(text)
if id:
try:
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text
= self.get_argument("markdown")
html = markdown.markdown(text)
if id:
try:
"UPDATE entries SET title = %s, markdown = %s, html = %s "
"WHERE id = %s",
title,
text
,
html,
int(id),
)
except NoResultError:
raise tornado.web.HTTPError(404)
slug = entry.slug
await self.execute(
"UPDATE entries SET title = %s, markdown = %s, html = %s "
"WHERE id = %s",
title,
| Уязвимый код | 217 self.execute( "INSERT INTO entries (author_id,title,slug,markdown,html,published,updated)" "VALUES (%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP)", self.current_user.id, title, slug, text, html, ) |
|
|
|
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title =
self.get_argument
("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title =
self.get_argument
("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title =
self.get_argument("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title
= self.get_argument("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
int(id),
)
else:
slug = unicodedata.normalize("NFKD",
title
)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
int(id),
)
else:
slug =
unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
int(id),
)
else:
slug
= unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
)
else:
slug = unicodedata.normalize("NFKD", title)
slug
= re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
)
else:
slug = unicodedata.normalize("NFKD", title)
slug =
re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
)
else:
slug = unicodedata.normalize("NFKD", title)
slug
= re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
else:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(
slug.lower().strip().split
())
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
else:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(
slug.lower().strip().split()
)
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
else:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "
-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
else:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug
= "-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug =
slug.encode("ascii", "ignore").decode
("ascii")
if not slug:
slug = "entry"
while True:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug =
slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
while True:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug
= slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
while True:
e = await self.query("SELECT * FROM entries WHERE slug = %s", slug)
if not e:
break
slug
+= "-2"
await self.execute(
"INSERT INTO entries (author_id,title,slug,markdown,html,published,updated)"
"VALUES (%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP)",
e = await self.query("SELECT * FROM entries WHERE slug = %s", slug)
if not e:
break
slug += "-2"
await self.execute(
"INSERT INTO entries (author_id,title,slug,markdown,html,published,updated)"
"VALUES (%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP)",
"VALUES (%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP)",
self.current_user.id,
title,
slug
,
text,
html,
)
if not e:
break
slug += "-2"
await self.execute(
"INSERT INTO entries (author_id,title,slug,markdown,html,published,updated)"
"VALUES (%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP)",
self.current_user.id,
|
|
|
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title =
self.get_argument
("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title =
self.get_argument
("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title =
self.get_argument("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title
= self.get_argument("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
"INSERT INTO entries (author_id,title,slug,markdown,html,published,updated)"
"VALUES (%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP)",
self.current_user.id,
title
,
slug,
text,
html,
if not e:
break
slug += "-2"
await self.execute(
"INSERT INTO entries (author_id,title,slug,markdown,html,published,updated)"
"VALUES (%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP)",
self.current_user.id,
|
|
|
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text =
self.get_argument
("markdown")
html = markdown.markdown(text)
if id:
try:
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text =
self.get_argument
("markdown")
html = markdown.markdown(text)
if id:
try:
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text =
self.get_argument("markdown")
html = markdown.markdown(text)
if id:
try:
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text
= self.get_argument("markdown")
html = markdown.markdown(text)
if id:
try:
id = self.get_argument("id", None)
title = self.get_argument("title")
text = self.get_argument("markdown")
html = markdown.markdown(
text
)
if id:
try:
entry = await self.queryone(
id = self.get_argument("id", None)
title = self.get_argument("title")
text = self.get_argument("markdown")
html =
markdown.markdown(text)
if id:
try:
entry = await self.queryone(
id = self.get_argument("id", None)
title = self.get_argument("title")
text = self.get_argument("markdown")
html
= markdown.markdown(text)
if id:
try:
entry = await self.queryone(
title,
slug,
text,
html
,
)
self.redirect("/entry/" + slug)
class AuthCreateHandler(BaseHandler):
if not e:
break
slug += "-2"
await self.execute(
"INSERT INTO entries (author_id,title,slug,markdown,html,published,updated)"
"VALUES (%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP)",
self.current_user.id,
|
|
|
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text =
self.get_argument
("markdown")
html = markdown.markdown(text)
if id:
try:
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text =
self.get_argument
("markdown")
html = markdown.markdown(text)
if id:
try:
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text =
self.get_argument("markdown")
html = markdown.markdown(text)
if id:
try:
async def post(self):
id = self.get_argument("id", None)
title = self.get_argument("title")
text
= self.get_argument("markdown")
html = markdown.markdown(text)
if id:
try:
self.current_user.id,
title,
slug,
text
,
html,
)
self.redirect("/entry/" + slug)
if not e:
break
slug += "-2"
await self.execute(
"INSERT INTO entries (author_id,title,slug,markdown,html,published,updated)"
"VALUES (%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP)",
self.current_user.id,
| Уязвимый код | 243 os.execv(sys.executable, [sys.executable] + argv) |
|
|
|
argv = _original_argv
else:
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv =
sys.argv
if spec:
argv = ["-m", spec.name] + argv[1:]
else:
argv = _original_argv
else:
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv =
sys.argv
if spec:
argv = ["-m", spec.name] + argv[1:]
else:
argv = _original_argv
else:
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv
= sys.argv
if spec:
argv = ["-m", spec.name] + argv[1:]
else:
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv
= ["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv = ["-m", spec.name] +
argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv =
["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv
= ["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
os._exit(0)
else:
try:
... os.execv(sys.executable, [sys.executable] +
argv
)
except OSError:
# Mac OS X versions prior to 10.6 do not support execv in
# a process that contains multiple threads. Instead of
os._exit(0)
else:
try:
os.execv(sys.executable,
[sys.executable] + argv
)
except OSError:
# Mac OS X versions prior to 10.6 do not support execv in
# a process that contains multiple threads. Instead of
os._exit(0)
else:
try:
os.execv(sys.executable, [sys.executable] + argv)
except OSError:
# Mac OS X versions prior to 10.6 do not support execv in
# a process that contains multiple threads. Instead of
|
|
|
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv
= ["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv
= ["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv = ["-m", spec.name] +
argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv =
["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv
= ["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
os._exit(0)
else:
try:
... os.execv(sys.executable, [sys.executable] +
argv
)
except OSError:
# Mac OS X versions prior to 10.6 do not support execv in
# a process that contains multiple threads. Instead of
os._exit(0)
else:
try:
os.execv(sys.executable,
[sys.executable] + argv
)
except OSError:
# Mac OS X versions prior to 10.6 do not support execv in
# a process that contains multiple threads. Instead of
os._exit(0)
else:
try:
os.execv(sys.executable, [sys.executable] + argv)
except OSError:
# Mac OS X versions prior to 10.6 do not support execv in
# a process that contains multiple threads. Instead of
| Уязвимый код | 256 os.spawnv( # type: ignore os.P_NOWAIT, sys.executable, [sys.executable] + argv ) |
|
|
|
argv = _original_argv
else:
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv =
sys.argv
if spec:
argv = ["-m", spec.name] + argv[1:]
else:
argv = _original_argv
else:
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv =
sys.argv
if spec:
argv = ["-m", spec.name] + argv[1:]
else:
argv = _original_argv
else:
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv
= sys.argv
if spec:
argv = ["-m", spec.name] + argv[1:]
else:
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv
= ["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv = ["-m", spec.name] +
argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv =
["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv
= ["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
# appear to be consistent, so we can't easily check for
# this error specifically.
os.spawnv( # type: ignore
... os.P_NOWAIT, sys.executable, [sys.executable] +
argv
)
# At this point the IOLoop has been closed and finally
# blocks will experience errors if we allow the stack to
# appear to be consistent, so we can't easily check for
# this error specifically.
os.spawnv( # type: ignore
os.P_NOWAIT, sys.executable,
[sys.executable] + argv
)
# At this point the IOLoop has been closed and finally
# blocks will experience errors if we allow the stack to
# Unfortunately the errno returned in this case does not
# appear to be consistent, so we can't easily check for
# this error specifically.
os.spawnv( # type: ignore
os.P_NOWAIT, sys.executable, [sys.executable] + argv
)
# At this point the IOLoop has been closed and finally
|
|
|
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv
= ["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv
= ["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv = ["-m", spec.name] +
argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv =
["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv
= ["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
# appear to be consistent, so we can't easily check for
# this error specifically.
os.spawnv( # type: ignore
... os.P_NOWAIT, sys.executable, [sys.executable] +
argv
)
# At this point the IOLoop has been closed and finally
# blocks will experience errors if we allow the stack to
# appear to be consistent, so we can't easily check for
# this error specifically.
os.spawnv( # type: ignore
os.P_NOWAIT, sys.executable,
[sys.executable] + argv
)
# At this point the IOLoop has been closed and finally
# blocks will experience errors if we allow the stack to
# Unfortunately the errno returned in this case does not
# appear to be consistent, so we can't easily check for
# this error specifically.
os.spawnv( # type: ignore
os.P_NOWAIT, sys.executable, [sys.executable] + argv
)
# At this point the IOLoop has been closed and finally
| Уязвимый код | 239 subprocess.Popen([sys.executable] + argv) |
|
|
|
argv = _original_argv
else:
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv =
sys.argv
if spec:
argv = ["-m", spec.name] + argv[1:]
else:
argv = _original_argv
else:
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv =
sys.argv
if spec:
argv = ["-m", spec.name] + argv[1:]
else:
argv = _original_argv
else:
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv
= sys.argv
if spec:
argv = ["-m", spec.name] + argv[1:]
else:
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv
= ["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv = ["-m", spec.name] +
argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv =
["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv
= ["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
):
os.environ["PYTHONPATH"] = path_prefix + os.environ.get("PYTHONPATH", "")
if not _has_execv:
subprocess.Popen([sys.executable] +
argv
)
os._exit(0)
else:
try:
):
os.environ["PYTHONPATH"] = path_prefix + os.environ.get("PYTHONPATH", "")
if not _has_execv:
subprocess.Popen(
[sys.executable] + argv
)
os._exit(0)
else:
try:
):
os.environ["PYTHONPATH"] = path_prefix + os.environ.get("PYTHONPATH", "")
if not _has_execv:
subprocess.Popen([sys.executable] + argv)
os._exit(0)
else:
try:
|
|
|
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv
= ["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv
= ["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv = ["-m", spec.name] +
argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv =
["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
spec = getattr(sys.modules["__main__"], "__spec__", None)
argv = sys.argv
if spec:
argv
= ["-m", spec.name] + argv[1:]
else:
path_prefix = "." + os.pathsep
if sys.path[0] == "" and not os.environ.get("PYTHONPATH", "").startswith(
):
os.environ["PYTHONPATH"] = path_prefix + os.environ.get("PYTHONPATH", "")
if not _has_execv:
subprocess.Popen([sys.executable] +
argv
)
os._exit(0)
else:
try:
):
os.environ["PYTHONPATH"] = path_prefix + os.environ.get("PYTHONPATH", "")
if not _has_execv:
subprocess.Popen(
[sys.executable] + argv
)
os._exit(0)
else:
try:
):
os.environ["PYTHONPATH"] = path_prefix + os.environ.get("PYTHONPATH", "")
if not _has_execv:
subprocess.Popen([sys.executable] + argv)
os._exit(0)
else:
try:
| Уязвимый код | 92 self.redirect(self.get_argument("next", "/")) |
|
|
|
code=self.get_argument("code"),
)
self.set_secure_cookie("fbdemo_user", tornado.escape.json_encode(user))
self.redirect(
self.get_argument
("next", "/"))
return
self.authorize_redirect(
redirect_uri=my_url,
code=self.get_argument("code"),
)
self.set_secure_cookie("fbdemo_user", tornado.escape.json_encode(user))
self.redirect(
self.get_argument
("next", "/"))
return
self.authorize_redirect(
redirect_uri=my_url,
code=self.get_argument("code"),
)
self.set_secure_cookie("fbdemo_user", tornado.escape.json_encode(user))
self.redirect(
self.get_argument("next", "/")
)
return
self.authorize_redirect(
redirect_uri=my_url,
code=self.get_argument("code"),
)
self.set_secure_cookie("fbdemo_user", tornado.escape.json_encode(user))
self.redirect(self.get_argument("next", "/"))
return
self.authorize_redirect(
redirect_uri=my_url,
| Уязвимый код | 104 self.redirect(self.get_argument("next", "/")) |
|
|
|
class AuthLogoutHandler(BaseHandler, tornado.auth.FacebookGraphMixin):
def get(self):
self.clear_cookie("fbdemo_user")
self.redirect(
self.get_argument
("next", "/"))
class PostModule(tornado.web.UIModule):
def render(self, post):
return self.render_string("modules/post.html", post=post)
class AuthLogoutHandler(BaseHandler, tornado.auth.FacebookGraphMixin):
def get(self):
self.clear_cookie("fbdemo_user")
self.redirect(
self.get_argument
("next", "/"))
class PostModule(tornado.web.UIModule):
def render(self, post):
return self.render_string("modules/post.html", post=post)
class AuthLogoutHandler(BaseHandler, tornado.auth.FacebookGraphMixin):
def get(self):
self.clear_cookie("fbdemo_user")
self.redirect(
self.get_argument("next", "/")
)
class PostModule(tornado.web.UIModule):
def render(self, post):
return self.render_string("modules/post.html", post=post)
class AuthLogoutHandler(BaseHandler, tornado.auth.FacebookGraphMixin):
def get(self):
self.clear_cookie("fbdemo_user")
self.redirect(self.get_argument("next", "/"))
class PostModule(tornado.web.UIModule):
def render(self, post):
return self.render_string("modules/post.html", post=post)
| Уязвимый код | 79 self.redirect(self.get_argument("next", "/")) |
|
|
|
user = yield self.get_authenticated_user()
del user["description"]
self.set_secure_cookie(self.COOKIE_NAME, json_encode(user))
self.redirect(
self.get_argument
("next", "/"))
else:
yield self.authorize_redirect(callback_uri=self.request.full_url())
class LogoutHandler(BaseHandler):
user = yield self.get_authenticated_user()
del user["description"]
self.set_secure_cookie(self.COOKIE_NAME, json_encode(user))
self.redirect(
self.get_argument
("next", "/"))
else:
yield self.authorize_redirect(callback_uri=self.request.full_url())
class LogoutHandler(BaseHandler):
user = yield self.get_authenticated_user()
del user["description"]
self.set_secure_cookie(self.COOKIE_NAME, json_encode(user))
self.redirect(
self.get_argument("next", "/")
)
else:
yield self.authorize_redirect(callback_uri=self.request.full_url())
class LogoutHandler(BaseHandler):
user = yield self.get_authenticated_user()
del user["description"]
self.set_secure_cookie(self.COOKIE_NAME, json_encode(user))
self.redirect(self.get_argument("next", "/"))
else:
yield self.authorize_redirect(callback_uri=self.request.full_url())
class LogoutHandler(BaseHandler):
|
|
|
self.render("compose.html", entry=entry)
@tornado.web.authenticated
async def post(self):
id =
self.get_argument
("id", None)
title = self.get_argument("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
self.render("compose.html", entry=entry)
@tornado.web.authenticated
async def post(self):
id =
self.get_argument
("id", None)
title = self.get_argument("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
self.render("compose.html", entry=entry)
@tornado.web.authenticated
async def post(self):
id =
self.get_argument("id", None)
title = self.get_argument("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
self.render("compose.html", entry=entry)
@tornado.web.authenticated
async def post(self):
id
= self.get_argument("id", None)
title = self.get_argument("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
try:
entry = await self.queryone(
"SELECT * FROM entries WHERE
id
= %s", int(id)
)
except NoResultError:
raise tornado.web.HTTPError(404)
if id:
try:
entry = await self.queryone(
... "SELECT * FROM entries WHERE id = %s",
int(id)
)
except NoResultError:
raise tornado.web.HTTPError(404)
html = markdown.markdown(text)
if id:
try:
entry = await self.queryone(
"SELECT * FROM entries WHERE id = %s", int(id)
)
except NoResultError:
html = markdown.markdown(text)
if id:
try:
entry = await self.queryone(
"SELECT * FROM entries WHERE id = %s", int(id)
)
except NoResultError:
html = markdown.markdown(text)
if id:
try:
entry
= await self.queryone(
"SELECT * FROM entries WHERE id = %s", int(id)
)
except NoResultError:
)
except NoResultError:
raise tornado.web.HTTPError(404)
slug =
entry.slug
await self.execute(
"UPDATE entries SET title = %s, markdown = %s, html = %s "
"WHERE id = %s",
)
except NoResultError:
raise tornado.web.HTTPError(404)
slug
= entry.slug
await self.execute(
"UPDATE entries SET title = %s, markdown = %s, html = %s "
"WHERE id = %s",
else:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(
slug.lower().strip().split
())
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
else:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(
slug.lower().strip().split()
)
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
else:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "
-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
else:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug
= "-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug =
slug.encode("ascii", "ignore").decode
("ascii")
if not slug:
slug = "entry"
while True:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug =
slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
while True:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug
= slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
while True:
e = await self.query("SELECT * FROM entries WHERE slug = %s", slug)
if not e:
break
slug
+= "-2"
await self.execute(
"INSERT INTO entries (author_id,title,slug,markdown,html,published,updated)"
"VALUES (%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP)",
e = await self.query("SELECT * FROM entries WHERE slug = %s", slug)
if not e:
break
slug += "-2"
await self.execute(
"INSERT INTO entries (author_id,title,slug,markdown,html,published,updated)"
"VALUES (%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP)",
text,
html,
)
self.redirect("/entry/" +
slug
)
class AuthCreateHandler(BaseHandler):
def get(self):
self.render("create_author.html")
text,
html,
)
self.redirect(
"/entry/" + slug
)
class AuthCreateHandler(BaseHandler):
def get(self):
self.render("create_author.html")
text,
html,
)
self.redirect("/entry/" + slug)
class AuthCreateHandler(BaseHandler):
def get(self):
self.render("create_author.html")
|
|
|
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title =
self.get_argument
("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title =
self.get_argument
("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title =
self.get_argument("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
@tornado.web.authenticated
async def post(self):
id = self.get_argument("id", None)
title
= self.get_argument("title")
text = self.get_argument("markdown")
html = markdown.markdown(text)
if id:
int(id),
)
else:
slug = unicodedata.normalize("NFKD",
title
)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
int(id),
)
else:
slug =
unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
int(id),
)
else:
slug
= unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
)
else:
slug = unicodedata.normalize("NFKD", title)
slug
= re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
)
else:
slug = unicodedata.normalize("NFKD", title)
slug =
re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
)
else:
slug = unicodedata.normalize("NFKD", title)
slug
= re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
else:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(
slug.lower().strip().split
())
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
else:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(
slug.lower().strip().split()
)
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
else:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "
-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
else:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug
= "-".join(slug.lower().strip().split())
slug = slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug =
slug.encode("ascii", "ignore").decode
("ascii")
if not slug:
slug = "entry"
while True:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug =
slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
while True:
slug = unicodedata.normalize("NFKD", title)
slug = re.sub(r"[^\w]+", " ", slug)
slug = "-".join(slug.lower().strip().split())
slug
= slug.encode("ascii", "ignore").decode("ascii")
if not slug:
slug = "entry"
while True:
e = await self.query("SELECT * FROM entries WHERE slug = %s", slug)
if not e:
break
slug
+= "-2"
await self.execute(
"INSERT INTO entries (author_id,title,slug,markdown,html,published,updated)"
"VALUES (%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP)",
e = await self.query("SELECT * FROM entries WHERE slug = %s", slug)
if not e:
break
slug += "-2"
await self.execute(
"INSERT INTO entries (author_id,title,slug,markdown,html,published,updated)"
"VALUES (%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP)",
text,
html,
)
self.redirect("/entry/" +
slug
)
class AuthCreateHandler(BaseHandler):
def get(self):
self.render("create_author.html")
text,
html,
)
self.redirect(
"/entry/" + slug
)
class AuthCreateHandler(BaseHandler):
def get(self):
self.render("create_author.html")
text,
html,
)
self.redirect("/entry/" + slug)
class AuthCreateHandler(BaseHandler):
def get(self):
self.render("create_author.html")
| Уязвимый код | 250 self.redirect(self.get_argument("next", "/")) |
|
|
|
tornado.escape.to_unicode(hashed_password),
)
self.set_secure_cookie("blogdemo_user", str(author.id))
self.redirect(
self.get_argument
("next", "/"))
class AuthLoginHandler(BaseHandler):
async def get(self):
# If there are no authors, redirect to the account creation page.
tornado.escape.to_unicode(hashed_password),
)
self.set_secure_cookie("blogdemo_user", str(author.id))
self.redirect(
self.get_argument
("next", "/"))
class AuthLoginHandler(BaseHandler):
async def get(self):
# If there are no authors, redirect to the account creation page.
tornado.escape.to_unicode(hashed_password),
)
self.set_secure_cookie("blogdemo_user", str(author.id))
self.redirect(
self.get_argument("next", "/")
)
class AuthLoginHandler(BaseHandler):
async def get(self):
# If there are no authors, redirect to the account creation page.
tornado.escape.to_unicode(hashed_password),
)
self.set_secure_cookie("blogdemo_user", str(author.id))
self.redirect(self.get_argument("next", "/"))
class AuthLoginHandler(BaseHandler):
async def get(self):
# If there are no authors, redirect to the account creation page.
| Уязвимый код | 278 self.redirect(self.get_argument("next", "/")) |
|
|
|
hashed_password = tornado.escape.to_unicode(hashed_password)
if hashed_password == author.hashed_password:
self.set_secure_cookie("blogdemo_user", str(author.id))
self.redirect(
self.get_argument
("next", "/"))
else:
self.render("login.html", error="incorrect password")
class AuthLogoutHandler(BaseHandler):
hashed_password = tornado.escape.to_unicode(hashed_password)
if hashed_password == author.hashed_password:
self.set_secure_cookie("blogdemo_user", str(author.id))
self.redirect(
self.get_argument
("next", "/"))
else:
self.render("login.html", error="incorrect password")
class AuthLogoutHandler(BaseHandler):
hashed_password = tornado.escape.to_unicode(hashed_password)
if hashed_password == author.hashed_password:
self.set_secure_cookie("blogdemo_user", str(author.id))
self.redirect(
self.get_argument("next", "/")
)
else:
self.render("login.html", error="incorrect password")
class AuthLogoutHandler(BaseHandler):
hashed_password = tornado.escape.to_unicode(hashed_password)
if hashed_password == author.hashed_password:
self.set_secure_cookie("blogdemo_user", str(author.id))
self.redirect(self.get_argument("next", "/"))
else:
self.render("login.html", error="incorrect password")
class AuthLogoutHandler(BaseHandler):
| Уязвимый код | 286 self.redirect(self.get_argument("next", "/")) |
|
|
|
class AuthLogoutHandler(BaseHandler):
def get(self):
self.clear_cookie("blogdemo_user")
self.redirect(
self.get_argument
("next", "/"))
class EntryModule(tornado.web.UIModule):
def render(self, entry):
return self.render_string("modules/entry.html", entry=entry)
class AuthLogoutHandler(BaseHandler):
def get(self):
self.clear_cookie("blogdemo_user")
self.redirect(
self.get_argument
("next", "/"))
class EntryModule(tornado.web.UIModule):
def render(self, entry):
return self.render_string("modules/entry.html", entry=entry)
class AuthLogoutHandler(BaseHandler):
def get(self):
self.clear_cookie("blogdemo_user")
self.redirect(
self.get_argument("next", "/")
)
class EntryModule(tornado.web.UIModule):
def render(self, entry):
return self.render_string("modules/entry.html", entry=entry)
class AuthLogoutHandler(BaseHandler):
def get(self):
self.clear_cookie("blogdemo_user")
self.redirect(self.get_argument("next", "/"))
class EntryModule(tornado.web.UIModule):
def render(self, entry):
return self.render_string("modules/entry.html", entry=entry)
| Уязвимый код | 78 self.redirect(self.get_argument("next")) |
|
|
|
self.render_string("message.html", message=message)
)
if self.get_argument("next", None):
self.redirect(
self.get_argument
("next"))
else:
self.write(message)
global_message_buffer.add_message(message)
self.render_string("message.html", message=message)
)
if self.get_argument("next", None):
self.redirect(
self.get_argument
("next"))
else:
self.write(message)
global_message_buffer.add_message(message)
self.render_string("message.html", message=message)
)
if self.get_argument("next", None):
self.redirect(
self.get_argument("next")
)
else:
self.write(message)
global_message_buffer.add_message(message)
self.render_string("message.html", message=message)
)
if self.get_argument("next", None):
self.redirect(self.get_argument("next"))
else:
self.write(message)
global_message_buffer.add_message(message)
| Уязвимый код | 61 self.redirect( self.get_argument("url"), status=int(self.get_argument("status", "302")) ) |
|
|
|
def prepare(self):
self.write("redirects can have bodies too")
self.redirect(
self.get_argument
("url"), status=int(self.get_argument("status", "3...
)
class RedirectWithoutLocationHandler(RequestHandler):
def prepare(self):
def prepare(self):
self.write("redirects can have bodies too")
self.redirect(
self.get_argument
("url"), status=int(self.get_argument("status", "3...
)
class RedirectWithoutLocationHandler(RequestHandler):
def prepare(self):
def prepare(self):
self.write("redirects can have bodies too")
self.redirect(
self.get_argument("url")
, status=int(self.get_argument("status", "302"))
)
class RedirectWithoutLocationHandler(RequestHandler):
def prepare(self):
class RedirectHandler(RequestHandler):
def prepare(self):
self.write("redirects can have bodies too")
self.redirect(
self.get_argument("url"), status=int(self.get_argument("status", "302"))
)
class RedirectWithoutLocationHandler(RequestHandler):
|
|
|
def prepare(self):
self.write("redirects can have bodies too")
self.redirect(
self.get_argument
("url"), status=int(self.get_argument("status", "3...
)
class RedirectWithoutLocationHandler(RequestHandler):
def prepare(self):
def prepare(self):
self.write("redirects can have bodies too")
self.redirect(
self.get_argument
("url"), status=int(self.get_argument("status", "3...
)
class RedirectWithoutLocationHandler(RequestHandler):
def prepare(self):
def prepare(self):
self.write("redirects can have bodies too")
self.redirect(
self.get_argument("url"), status=int(
self.get_argument("status", "302")
)
)
class RedirectWithoutLocationHandler(RequestHandler):
def prepare(self):
def prepare(self):
self.write("redirects can have bodies too")
self.redirect(
self.get_argument("url"), status=
int(self.get_argument("status", "302"))
)
class RedirectWithoutLocationHandler(RequestHandler):
def prepare(self):
class RedirectHandler(RequestHandler):
def prepare(self):
self.write("redirects can have bodies too")
self.redirect(
self.get_argument("url"), status=int(self.get_argument("status", "302"))
)
class RedirectWithoutLocationHandler(RequestHandler):
| Уязвимый код | 564 self.redirect(url_concat(self.get_argument("redirect_uri"), dict(code=code))) |
|
|
|
def get(self):
# issue a fake auth code and redirect to redirect_uri
code = "fake-authorization-code"
self.redirect(url_concat(
self.get_argument
("redirect_uri"), dict(code=code)))
class GoogleOAuth2TokenHandler(RequestHandler):
def post(self):
assert self.get_argument("code") == "fake-authorization-code"
def get(self):
# issue a fake auth code and redirect to redirect_uri
code = "fake-authorization-code"
self.redirect(url_concat(
self.get_argument
("redirect_uri"), dict(code=code)))
class GoogleOAuth2TokenHandler(RequestHandler):
def post(self):
assert self.get_argument("code") == "fake-authorization-code"
def get(self):
# issue a fake auth code and redirect to redirect_uri
code = "fake-authorization-code"
self.redirect(url_concat(
self.get_argument("redirect_uri")
, dict(code=code)))
class GoogleOAuth2TokenHandler(RequestHandler):
def post(self):
assert self.get_argument("code") == "fake-authorization-code"
def get(self):
# issue a fake auth code and redirect to redirect_uri
code = "fake-authorization-code"
self.redirect(
url_concat(self.get_argument("redirect_uri"), dict(code=code))
)
class GoogleOAuth2TokenHandler(RequestHandler):
def post(self):
assert self.get_argument("code") == "fake-authorization-code"
def get(self):
# issue a fake auth code and redirect to redirect_uri
code = "fake-authorization-code"
self.redirect(url_concat(self.get_argument("redirect_uri"), dict(code=code)))
class GoogleOAuth2TokenHandler(RequestHandler):
def post(self):
assert self.get_argument("code") == "fake-authorization-code"
| Уязвимый код | 652 self.redirect("/", permanent=int(self.get_argument("permanent"))) |
|
|
|
class RedirectHandler(RequestHandler):
def get(self):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(
self.get_argument
("permanent")))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(self.get_argument("status")))
else:
class RedirectHandler(RequestHandler):
def get(self):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(
self.get_argument
("permanent")))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(self.get_argument("status")))
else:
class RedirectHandler(RequestHandler):
def get(self):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(
self.get_argument("permanent")
))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(self.get_argument("status")))
else:
class RedirectHandler(RequestHandler):
def get(self):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=
int(self.get_argument("permanent"))
)
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(self.get_argument("status")))
else:
class RedirectHandler(RequestHandler):
def get(self):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(self.get_argument("permanent")))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(self.get_argument("status")))
else:
| Уязвимый код | 654 self.redirect("/", status=int(self.get_argument("status"))) |
|
|
|
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(self.get_argument("permanent")))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(
self.get_argument
("status")))
else:
raise Exception("didn't get permanent or status arguments")
class EmptyFlushCallbackHandler(RequestHandler):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(self.get_argument("permanent")))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(
self.get_argument
("status")))
else:
raise Exception("didn't get permanent or status arguments")
class EmptyFlushCallbackHandler(RequestHandler):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(self.get_argument("permanent")))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(
self.get_argument("status")
))
else:
raise Exception("didn't get permanent or status arguments")
class EmptyFlushCallbackHandler(RequestHandler):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(self.get_argument("permanent")))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=
int(self.get_argument("status"))
)
else:
raise Exception("didn't get permanent or status arguments")
class EmptyFlushCallbackHandler(RequestHandler):
if self.get_argument("permanent", None) is not None:
self.redirect("/", permanent=int(self.get_argument("permanent")))
elif self.get_argument("status", None) is not None:
self.redirect("/", status=int(self.get_argument("status")))
else:
raise Exception("didn't get permanent or status arguments")
class EmptyFlushCallbackHandler(RequestHandler):
| Уязвимый код | 22 self.redirect(path, status=int(self.get_argument('status', '302'))) |
|
|
|
self.write("Hello world")
class RedirectHandler(RequestHandler):
def get(self, path):
self.redirect(path, status=int(
self.get_argument
('status', '302')))
class PostHandler(RequestHandler):
def post(self):
assert self.get_argument('foo') == 'bar'
self.write("Hello world")
class RedirectHandler(RequestHandler):
def get(self, path):
self.redirect(path, status=int(
self.get_argument
('status', '302')))
class PostHandler(RequestHandler):
def post(self):
assert self.get_argument('foo') == 'bar'
self.write("Hello world")
class RedirectHandler(RequestHandler):
def get(self, path):
self.redirect(path, status=int(
self.get_argument('status', '302')
))
class PostHandler(RequestHandler):
def post(self):
assert self.get_argument('foo') == 'bar'
self.write("Hello world")
class RedirectHandler(RequestHandler):
def get(self, path):
self.redirect(path, status=
int(self.get_argument('status', '302'))
)
class PostHandler(RequestHandler):
def post(self):
assert self.get_argument('foo') == 'bar'
self.write("Hello world")
class RedirectHandler(RequestHandler):
def get(self, path):
self.redirect(path, status=int(self.get_argument('status', '302')))
class PostHandler(RequestHandler):
def post(self):
assert self.get_argument('foo') == 'bar'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| Уязвимый код | 710 _OAUTH_REQUEST_TOKEN_URL |
|
|
|
and all of the custom Twitter user attributes described at
https://dev.twitter.com/docs/api/1.1/get/users/show
"""
_OAUTH_REQUEST_TOKEN_URL =
"https://api.twitter.com/oauth/request_token"
_OAUTH_ACCESS_TOKEN_URL = "https://api.twitter.com/oauth/access_token"
_OAUTH_AUTHORIZE_URL = "https://api.twitter.com/oauth/authorize"
_OAUTH_AUTHENTICATE_URL = "https://api.twitter.com/oauth/authenticate"
and all of the custom Twitter user attributes described at
https://dev.twitter.com/docs/api/1.1/get/users/show
"""
_OAUTH_REQUEST_TOKEN_URL =
"https://api.twitter.com/oauth/request_token"
_OAUTH_ACCESS_TOKEN_URL = "https://api.twitter.com/oauth/access_token"
_OAUTH_AUTHORIZE_URL = "https://api.twitter.com/oauth/authorize"
_OAUTH_AUTHENTICATE_URL = "https://api.twitter.com/oauth/authenticate"
and all of the custom Twitter user attributes described at
https://dev.twitter.com/docs/api/1.1/get/users/show
"""
_OAUTH_REQUEST_TOKEN_URL
= "https://api.twitter.com/oauth/request_token"
_OAUTH_ACCESS_TOKEN_URL = "https://api.twitter.com/oauth/access_token"
_OAUTH_AUTHORIZE_URL = "https://api.twitter.com/oauth/authorize"
_OAUTH_AUTHENTICATE_URL = "https://api.twitter.com/oauth/authenticate"
| Уязвимый код | 711 _OAUTH_ACCESS_TOKEN_URL |
|
|
|
https://dev.twitter.com/docs/api/1.1/get/users/show
"""
_OAUTH_REQUEST_TOKEN_URL = "https://api.twitter.com/oauth/request_token"
_OAUTH_ACCESS_TOKEN_URL =
"https://api.twitter.com/oauth/access_token"
_OAUTH_AUTHORIZE_URL = "https://api.twitter.com/oauth/authorize"
_OAUTH_AUTHENTICATE_URL = "https://api.twitter.com/oauth/authenticate"
_OAUTH_NO_CALLBACKS = False
https://dev.twitter.com/docs/api/1.1/get/users/show
"""
_OAUTH_REQUEST_TOKEN_URL = "https://api.twitter.com/oauth/request_token"
_OAUTH_ACCESS_TOKEN_URL =
"https://api.twitter.com/oauth/access_token"
_OAUTH_AUTHORIZE_URL = "https://api.twitter.com/oauth/authorize"
_OAUTH_AUTHENTICATE_URL = "https://api.twitter.com/oauth/authenticate"
_OAUTH_NO_CALLBACKS = False
https://dev.twitter.com/docs/api/1.1/get/users/show
"""
_OAUTH_REQUEST_TOKEN_URL = "https://api.twitter.com/oauth/request_token"
_OAUTH_ACCESS_TOKEN_URL
= "https://api.twitter.com/oauth/access_token"
_OAUTH_AUTHORIZE_URL = "https://api.twitter.com/oauth/authorize"
_OAUTH_AUTHENTICATE_URL = "https://api.twitter.com/oauth/authenticate"
_OAUTH_NO_CALLBACKS = False
| Уязвимый код | 854 _OAUTH_ACCESS_TOKEN_URL |
|
|
|
.. versionadded:: 3.2
"""
_OAUTH_AUTHORIZE_URL = "https://accounts.google.com/o/oauth2/v2/auth"
_OAUTH_ACCESS_TOKEN_URL =
"https://www.googleapis.com/oauth2/v4/token"
_OAUTH_USERINFO_URL = "https://www.googleapis.com/oauth2/v1/userinfo"
_OAUTH_NO_CALLBACKS = False
_OAUTH_SETTINGS_KEY = "google_oauth"
.. versionadded:: 3.2
"""
_OAUTH_AUTHORIZE_URL = "https://accounts.google.com/o/oauth2/v2/auth"
_OAUTH_ACCESS_TOKEN_URL =
"https://www.googleapis.com/oauth2/v4/token"
_OAUTH_USERINFO_URL = "https://www.googleapis.com/oauth2/v1/userinfo"
_OAUTH_NO_CALLBACKS = False
_OAUTH_SETTINGS_KEY = "google_oauth"
.. versionadded:: 3.2
"""
_OAUTH_AUTHORIZE_URL = "https://accounts.google.com/o/oauth2/v2/auth"
_OAUTH_ACCESS_TOKEN_URL
= "https://www.googleapis.com/oauth2/v4/token"
_OAUTH_USERINFO_URL = "https://www.googleapis.com/oauth2/v1/userinfo"
_OAUTH_NO_CALLBACKS = False
_OAUTH_SETTINGS_KEY = "google_oauth"
| Уязвимый код | 927 _OAUTH_ACCESS_TOKEN_URL |
|
|
|
return escape.json_decode(response.body)
class FacebookGraphMixin(OAuth2Mixin):
"""Facebook authentication using the new Graph API and OAuth2."""
_OAUTH_ACCESS_TOKEN_URL =
"https://graph.facebook.com/oauth/access_token?"
_OAUTH_AUTHORIZE_URL = "https://www.facebook.com/dialog/oauth?"
_OAUTH_NO_CALLBACKS = False
_FACEBOOK_BASE_URL = "https://graph.facebook.com"
return escape.json_decode(response.body)
class FacebookGraphMixin(OAuth2Mixin):
"""Facebook authentication using the new Graph API and OAuth2."""
_OAUTH_ACCESS_TOKEN_URL =
"https://graph.facebook.com/oauth/access_token?"
_OAUTH_AUTHORIZE_URL = "https://www.facebook.com/dialog/oauth?"
_OAUTH_NO_CALLBACKS = False
_FACEBOOK_BASE_URL = "https://graph.facebook.com"
return escape.json_decode(response.body)
class FacebookGraphMixin(OAuth2Mixin):
"""Facebook authentication using the new Graph API and OAuth2."""
_OAUTH_ACCESS_TOKEN_URL
= "https://graph.facebook.com/oauth/access_token?...
_OAUTH_AUTHORIZE_URL = "https://www.facebook.com/dialog/oauth?"
_OAUTH_NO_CALLBACKS = False
_FACEBOOK_BASE_URL = "https://graph.facebook.com"
|
|
|
username, password = self.parsed.username, self.parsed.password
elif self.request.auth_username is not None:
username = self.request.auth_username
password = self.request.auth_password or
""
if username is not None:
assert password is not None
if self.request.auth_mode not in (None, "basic"):
username, password = self.parsed.username, self.parsed.password
elif self.request.auth_username is not None:
username = self.request.auth_username
password = self.request.auth_password or
""
if username is not None:
assert password is not None
if self.request.auth_mode not in (None, "basic"):
username, password = self.parsed.username, self.parsed.password
elif self.request.auth_username is not None:
username = self.request.auth_username
password =
self.request.auth_password or ""
if username is not None:
assert password is not None
if self.request.auth_mode not in (None, "basic"):
username, password = self.parsed.username, self.parsed.password
elif self.request.auth_username is not None:
username = self.request.auth_username
password
= self.request.auth_password or ""
if username is not None:
assert password is not None
if self.request.auth_mode not in (None, "basic"):
|
|
|
)
# This test was shared with wsgi_test.py; now the name is meaningless.
class WSGISafeWebTest(WebTestCase):
COOKIE_SECRET =
"WebTest.COOKIE_SECRET"
def get_app_kwargs(self):
loader = DictLoader(
{
)
# This test was shared with wsgi_test.py; now the name is meaningless.
class WSGISafeWebTest(WebTestCase):
COOKIE_SECRET =
"WebTest.COOKIE_SECRET"
def get_app_kwargs(self):
loader = DictLoader(
{
)
# This test was shared with wsgi_test.py; now the name is meaningless.
class WSGISafeWebTest(WebTestCase):
COOKIE_SECRET
= "WebTest.COOKIE_SECRET"
def get_app_kwargs(self):
loader = DictLoader(
{
|
|
|
self.skipTest("requires HTTP/1.x")
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET =
"It's a secret to everybody"
SECRET_DICT = {0: "asdfbasdf", 1: "12312312", 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
self.skipTest("requires HTTP/1.x")
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET =
"It's a secret to everybody"
SECRET_DICT = {0: "asdfbasdf", 1: "12312312", 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
self.skipTest("requires HTTP/1.x")
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET
= "It's a secret to everybody"
SECRET_DICT = {0: "asdfbasdf", 1: "12312312", 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
|
|
|
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET = "It's a secret to everybody"
SECRET_DICT = {0:
"asdfbasdf"
, 1: "12312312", 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
def present(self):
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET = "It's a secret to everybody"
SECRET_DICT = {0:
"asdfbasdf"
, 1: "12312312", 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
def present(self):
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET = "It's a secret to everybody"
SECRET_DICT = {
0
: "asdfbasdf", 1: "12312312", 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
def present(self):
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET = "It's a secret to everybody"
SECRET_DICT =
{0: "asdfbasdf", 1: "12312312", 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
def present(self):
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET = "It's a secret to everybody"
SECRET_DICT
= {0: "asdfbasdf", 1: "12312312", 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
def present(self):
|
|
|
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET = "It's a secret to everybody"
SECRET_DICT = {0: "asdfbasdf", 1:
"12312312"
, 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
def present(self):
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET = "It's a secret to everybody"
SECRET_DICT = {0: "asdfbasdf", 1:
"12312312"
, 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
def present(self):
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET = "It's a secret to everybody"
SECRET_DICT = {0: "asdfbasdf",
1
: "12312312", 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
def present(self):
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET = "It's a secret to everybody"
SECRET_DICT =
{0: "asdfbasdf", 1: "12312312", 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
def present(self):
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET = "It's a secret to everybody"
SECRET_DICT
= {0: "asdfbasdf", 1: "12312312", 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
def present(self):
|
|
|
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET = "It's a secret to everybody"
... SECRET_DICT = {0: "asdfbasdf", 1: "12312312", 2:
"2342342"
}
def past(self):
return self.present() - 86400 * 32
def present(self):
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET = "It's a secret to everybody"
... SECRET_DICT = {0: "asdfbasdf", 1: "12312312", 2:
"2342342"
}
def past(self):
return self.present() - 86400 * 32
def present(self):
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET = "It's a secret to everybody"
SECRET_DICT = {0: "asdfbasdf", 1: "1
2
312312", 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
def present(self):
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET = "It's a secret to everybody"
SECRET_DICT =
{0: "asdfbasdf", 1: "12312312", 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
def present(self):
self.assertEqual(response.code, 599)
class SignedValueTest(unittest.TestCase):
SECRET = "It's a secret to everybody"
SECRET_DICT
= {0: "asdfbasdf", 1: "12312312", 2: "2342342"}
def past(self):
return self.present() - 86400 * 32
def present(self):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Глоссарий
JQuery добавление HTML кода в DOM
Уязвимость селектора, добавляющая HTML код в DOM структуру страницы.
JQuery небезопасная функция для DOM
Небезопасная функция, добавляющая HTML код в DOM структуру.
Внедрение SQL-кода
Веб-приложение может формировать SQL-запросы, опираясь на данные, получаемые от пользователя. Из-за недостаточной обработки данных, злоумышленник может внедрить операторы SQL, тем самым изменить логику запроса, обрабатываемого базой данных. В ряде случаев злоумышленник не ограничен одним лишь чтением информации из базы данных. Также существует возможность читать произвольные файлы, создавать новые, проводить атаки, направленные на отказ в обслуживании. Развивая атаку, можно завладеть исходными кодами веб-приложения, загрузить и впоследствии выполнить вредоносный код.
Пример уязвимого сценария:
if (isset($_GET['id'])) {
try {
$dbh = new PDO('mysql:dbname=test;host=127.0.0.1', 'root', 'pass');
} catch (PDOException $e) {
echo "Connection failed\n";
exit;
}
$stm = $dbh->query("SELECT * FROM users WHERE id=" . $_GET['id']);
$user = $stm->fetch();
echo "User name: " . $user['name'] . "\n";
}
Злоумышленник может получить произвольные данные, например версию базы данных, передав в параметре id следующую строку: 0+union+select+1,version().
Рисунок 1. Пример атаки «Внедрение SQL-кода»
Рекомендации
Для защиты от атак «Внедрение операторов SQL», рекомендуется приводить значение переменной к целочисленному типу.
if (isset($_GET['id'])) {
try {
$dbh = new PDO('mysql:dbname=test;host=127.0.0.1', 'root', 'pass');
} catch (PDOException $e) {
echo "Connection failed\n";
exit;
}
$stm = $dbh->query("SELECT * FROM users WHERE id=" . (int)$_GET['id']);
$user = $stm->fetch();
echo "User name: " . $user['name'] . "\n";
}
Хорошим решением будет использование в коде приложения параметризированных SQL-запросов вместо простой конкатенации переменных с шаблоном запроса.
if (isset($_GET['id'])) {
try {
$dbh = new PDO('mysql:dbname=test;host=127.0.0.1', 'root', 'pass');
} catch (PDOException $e) {
echo "Connection failed\n";
exit;
}
$stm = $dbh->prepare("SELECT name FROM users WHERE id=?");
$stm->execute(array($_GET['id']));
$user = $stm->fetch(PDO::FETCH_ASSOC);
echo "User name: " . $user['name'] . "\n";
}
Кроме того, необходимо использовать принцип наименьших привилегий и гарантировать, что учетная запись пользователя, созданная для взаимодействия веб-приложения с базой данных, имеет минимальный набор прав, а все дополнительные привилегии недоступны.
Ссылки
Использование assert выражения
Обнаружено использование assert выражения. Подобный код будет удален при оптимизации байт кода. Убедитесь, что исключение не влияет на важные операции в кодеИспользование urllib
urllib позволяет производить операции не только с внешними ссылками, но и с локальными файлами. Это может привести к нежелательным запускам исполняемых файлов на локальной машине, особенно если вызов производится в соответствии с пользовательским вводомИспользование жестко закодированного пароля
В программном обеспечении реализован жестко закодированный пароль, который используется для аутентификации или взаимодействия с внешними компонентами. Жестко закодированный пароль может стать причиной серьезных ошибок аутентификации, которые сложно обнаружить системному администратору. После обнаружения недостатков бывает сложно их исправить, поэтому администратор может полностью заблокировать доступ к продукту. Основные сферы применения: для "внутренних" целей, когда механизм аутентификации в ПО проверяет жестко закодированный пароль; для "внешних" целей, когда для подключения ПО к другим системам или компонентам используется жестко закодированный пароль. При использовании для внутренних целей в продукте создается стандартная учетная запись администратора, для которой жестко прописывается простой пароль. Этот жестко закодированный пароль является одинаковым для всех установок продукта и, как правило, не может быть изменен или заблокирован системным администратором без доработки программы вручную или установки исправлений для ПО. Если пароль стал известен или опубликован (например, в сети Интернет), то любой пользователь, знающий этот пароль, может получить доступ к продукту. Также возможно проведение массовых атак, например с использованием "червей", поскольку все установки ПО, независимо от организации, будут иметь одинаковый пароль. При использовании для внешних целей предполагается аутентификация между фронтенд-системами и бэкенд-службой. Для доступа к бэкенд-службе может использоваться фиксированный пароль, который можно легко вычислить. Учетные данные бэкенда могут быть жестко прописаны разработчиком в ПО фронтенда. Любой пользователь программы может извлечь этот пароль. Системы на стороне пользователя, имеющие жестко закодированные пароли, представляют собой более серьезную угрозу, поскольку извлечение пароля из бинарного файла, как правило, не представляет особой сложности.Использование устаревшей функции
В коде используются нерекомендуемые или устаревшие функции, что может быть связано с отсутствием анализа или поддержки кода. По мере развития языков программирования функции могут устаревать по следующим причинам: * улучшения, реализуемые в языке; * улучшение эффективности и безопасности выполняемых операций; * изменения в правилах, описывающих работу некоторых функций. Удаляемые функции обычно заменяются аналогами, которые выполняют те же задачи иным и, как правило, более эффективным способом.Межсайтовое выполнение сценариев
Веб-приложение уязвимо, когда оно, получив данные из недоверенного источника, выводит их без необходимой обработки в ответ веб-сервера. Злоумышленник может провести данную атаку с целью выполнения вредоносного сценария в контексте домена, на который был отправлен запрос. Как правило, вредоносный код осуществляет отправку злоумышленнику важной информации пользователя, например, идентификатора сессии.
Пример уязвимого сценария:
if (isset($_GET['name'])) {
echo "Hello {$_GET['name']}!\n";
}
Злоумышленник может вынудить пользователя перейти по адресу /xss/reflected/?name=%3Cscript%3Ealert("xss")%3C/script%3E . В ответ веб-сервера будет внедрен следующий код: "<script>alert("xss")</script>". Результат проведения атаки приведен на рисунке:
Рисунок 1. Пример проведения атаки.
Веб-приложение может иметь уязвимости, позволяющие проводить атаки «Межсайтовое выполнение сценариев» не только в местах вывода данных, полученных из HTTP-запроса, но и в местах вывода результатов запросов к базе данных, содержимого файлов и т.д.
Рекомендации
Для устранения уязвимости необходимо экранировать служебные символы в зависимости от контекста, в который вставляются данные.
Ссылки
Небезопасные настройки SSL
Использование данных опций может ослабить защиту передаваемых данных приложения.Некорректная нейтрализация директив в динамически вычисляемом коде (Внедрение eval)
Программное обеспечение получает входные данные от вышестоящего компонента, но не осуществляет или осуществляет некорректно нейтрализацию синтаксиса перед использованием данных для динамической оценки (например, для вызова функции eval). Это позволяет злоумышленникам выполнить произвольный код или, по крайней мере, повлиять на то, какой код будет выполнен.
Открытое перенаправление
Веб-приложение может осуществлять перенаправление пользователей. В ряде случаев адрес, на который будет перенаправлен пользователь, формируется из данных, переданных пользователем.
Пример уязвимого сценария:
if (isset($_GET['redirect'])) {
header('Location: ' . $_GET['redirect']);
}
В случае перехода пользователем по ссылке, полученной от злоумышленника, произойдет перенаправление по специально сформированному адресу. Пример ссылки: http://trusted.com/?redirect=http://evil.com. Чтобы скрыть адрес перенаправления, параметр можно закодировать. Тогда ссылка примет следующий вид: http://trusted.com/?redirect=%68%74%74%70%3A%2F%2F%65%76%69%6C%2E%63%6F%6D.
Данная уязвимость может использоваться для фишинг-атак, обхода механизмов защиты при проведении атак «Подделка HTTP-запроса» и т.д.
Рекомендации
Рекомендуется осуществлять проверку по белому списку доменных имен и протоколов. Также рекомендуется использовать страницу с предупреждением о переходе на другой сайт. Пользователь должен быть перенаправлен с длительной задержкой, либо по нажатию на ссылку.
Ссылки
Подделка записи файла журнала
Приложение осуществляет недостаточную проверку данных, поступающих от пользователя, что позволяет злоумышленнику сформировать поддельную запись в лог-файл или внедрить в него вредоносный код.
Веб-приложение может формировать записи для лог-файла, опираясь на данные, получаемые от пользователя. Из-за недостаточной обработки данных, злоумышленник может внедрить символы, используемые для разделения записей, тем самым сформировать поддельную запись в лог-файле или повредить журнал, сделав невозможным автоматическую обработку данного файла. Также злоумышленник может внедрить вредоносный код, который может быть использован при развитии атаки.
Ниже приведен пример, уязвимого сценария:
$result = $authService->authenticate($login, $password);
if (!$result->isValid()) {
$log->write("Authentication failed for user '$login'");
}
Рекомендации
Рекомендуется обрабатывать данные, записываемые в лог-файл, убирая или экранируя символы, используемые для разметки.
Ссылки
Подделка запроса со стороны сервера
Приложение, получив URL-адрес или HTTP-сообщение, осуществляет недостаточную проверку адреса назначения, перед отправкой запроса. Используя данный недостаток, злоумышленник может отправлять запросы на серверы с ограниченным доступом (например, компьютеры в локальной сети). Данные действия могут привести к разглашению важных данных, получению злоумышленником исходных кодов приложения, к отказу в обслуживании и т.д.
Ссылки
Пустой блок обработки исключений
Слабая обработка ошибок: пустой блок обработки исключений. Игнорирование исключения может заставить программу не учитывать неожиданные состояния и условия.Раскрытие информации в статических файлах или константах
HTML документы или строковые константы могут содержать чувствительную информацию. Стоит ограничить присутствие такой информации в этих местахРаскрытие путей
Приложение использует данные извне для формирования пути к используемому файлу внутри определенной директории, но не проверяет их на потенциально небезопасные подстроки, такие как ".." или абсолютные пути в другие директории "/abs/path". Это может привести к несанкционированному доступу к другим файлам.
Статический генератор случайных чисел
Статические генераторы случайных чисел, в отличие от криптографических, не обеспечивают требуемой случайности выходных последовательностей, поэтому атакующие могут осуществить атаку подбора.Удаленное выполнение кода
Веб-приложение может формировать исполняемый код, используя данные, полученные от пользователя. В случае отсутствия обработки специальных символов, способных изменить синтаксис или поведение предназначенного кода, злоумышленник может выполнить произвольный код.
Пример уязвимого сценария:
if (isset($_GET['documents'])) {
$documents = eval('return array(' . $_GET['documents'] . ');');
foreach ($documents as $document) {
// Processing document ...
}
}
В данном примере злоумышленник может передать сценарию вредоносный код, который будет выполнен приложением. На изображении продемонстрирован пример атаки – вызывается стандартная функция phpinfo:
Рисунок 1. Проведение атаки «Выполнение произвольного кода»
Рекомендации
Для устранения уязвимости рекомендуется произвести рефакторинг веб-приложения, так чтобы в ней не содержалось участков динамически генерирующих исполняемый код. Также рекомендуется выполнять код в изолированной среде.